ldap-python-webui/app.py

1219 lines
40 KiB
Python

#!/usr/bin/env python3
'''
ldap-python-webui :: LDAP kudeaketarako Web Interfazea - Web UI for LDAP management.
Copyright (C) 2022 Aitzol Berasategi - Wproject
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import bottle
from bottle import get, post, static_file, request, route, template, error
from bottle import SimpleTemplate
#from bottle.ext import beaker
from beaker.middleware import SessionMiddleware
from configparser import ConfigParser
from ldap3 import Server, Connection, ALL
from ldap3 import SIMPLE, SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE, ALL_ATTRIBUTES
from ldap3.core.exceptions import LDAPBindError, LDAPConstraintViolationResult, \
LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError, \
LDAPSocketOpenError, LDAPExceptionError, LDAPAttributeOrValueExistsResult, \
LDAPNoSuchAttributeResult
import logging
from os import getenv, environ, path, remove
from libs import flist, slist
from libs.localization import *
from libs.helper import tools
import random
from user_agents import parse as ua_parse
from datetime import datetime
import cryptocode
import base64
BASE_DIR = path.dirname(__file__)
LOG = logging.getLogger(__name__)
LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s'
VERSION = '0.0.3'
@get('/')
def get_index():
try:
return user_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/user')
def get_index():
try:
print('SESSION:',newSession().get())
return user_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/signup')
def get_index():
newSession()
if allowed:
return signup_tpl(str=i18n.str)
else:
return index_tpl(alerts=[('error', i18n.msg[28], 'upDown')], str=i18n.str)
@get('/change_pwd')
def get_index():
try:
return change_pwd_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/edit_fullname')
def get_index():
try:
return edit_fullname_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/edit_email')
def get_index():
try:
return edit_email_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/logs')
def get_index():
try:
return logs_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/delete')
def get_index():
try:
return delete_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/logout')
def get_index():
def error(msg):
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
try:
username = newSession().get()['username']
if(username is not None):
logout(username)
except Error as e:
LOG.warning("Unsuccessful attempt to log out: %s" % e)
return error(str(e))
return index_tpl(alerts=[('success', i18n.msg[0], 'fadeOut')], str=i18n.str)
@get('/_2fa')
def get_index():
try:
reload(newSession().get()['username'], None, None)
return _2fa_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@get('/enable_2fa')
@get('/disable_2fa')
def get_index():
try:
return user_tpl(data=newSession().get(), str=i18n.str)
except Exception as e:
return index_tpl(str=i18n.str)
@post('/user')
def post_user():
form = request.forms.getunicode
def error(msg):
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
if len(form('username')) < 3:
return error(i18n.msg[2])
elif not tools.input_validation(form('username')):
return error(i18n.msg[3])
if not tools.pwd_validation(form('password')):
return error(i18n.msg[21])
try:
login(form('username'), form('password'))
except Error as e:
LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
return error(str(e))
try:
#if(check_2fa_step1(form('username'))):
if(newSession().get()['secureAuth']):
# encrypt and store the credentials
key = tools.key()
data = ';'.join([form('username'),form('password'),newSession().get()['authCode']])
data_enc = cryptocode.encrypt(data, key)
data_to_url = base64.urlsafe_b64encode(str.encode(data_enc))
memo.data = data_enc
memo.key = key
logout(form('username'))
return index_tpl(two_factor_authentication=True, path=data_to_url, str=i18n.str)
except Error as e:
#On error force disable 2fa
add_auth_attribute_step1(form('username'), None, action='disable')
LOG.debug("Two factor authentication has been impossible.")
return error(i18n.msg[29])
return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
@post('/user/<path>')
def post_user_step2(path):
form = request.forms.getunicode
def error(msg):
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
try:
# decrypt url
path = base64.urlsafe_b64decode(path)
path = cryptocode.decrypt(path.decode('utf-8'), memo.key)
data = path.split(';')
username = data[0]
password = data[1]
secret = data[2]
except:
newSession()
return error(i18n.msg[27])
#if not tools._2fa_validation(form('token'), newSession().get()['authCode']):
if not tools._2fa_validation(form('token'), secret):
return error(i18n.msg[6])
else:
try:
login(username, password)
except Error as e:
LOG.warning("Unsuccessful attempt to login %s: %s" % (username, e))
return error(str(e))
return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], newSession().get()['username']), 'fadeOut' )], data=newSession().get(), str=i18n.str)
@post('/signup')
def post_signup():
#ensure that i18n exists
if 'i18n' not in globals():
newSession()
form = request.forms.getunicode
isFake = False
db = 'data/invite-codes.db'
def auto_complete(arg):
if arg == 'firstname':
result = random.choice(flist.firstname)
elif arg == 'surname':
result = random.choice(slist.surname)
return(result.capitalize())
def error(msg):
return signup_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
try:
if not tools.code_is_valid(form('invite_code'), db):
return(error(i18n.msg[6]))
except Exception as e:
LOG.error(e)
return(error(i18n.msg[6]))
if len(form('username')) < 3:
return error(i18n.msg[2])
username = form('username').lower()
if not tools.input_validation(username):
return error(i18n.msg[3])
if len(form('firstname')) == 0:
firstname = auto_complete('firstname')
isFake = True
else:
firstname = form('firstname').title()
if len(form('surname')) == 0:
surname = auto_complete('surname')
isFake = True
else:
surname = form('surname').title()
email = form('email').lower()
if not tools.email_validation(email):
return error(i18n.msg[15])
if not tools.pwd_validation(form('password')):
return error(i18n.msg[8])
elif form('password') != form('confirm-password'):
return error(i18n.msg[7])
try:
account_request(username, firstname, surname, form('password'), email, isFake, get_dev())
except Error as e:
LOG.warning("Unsuccessful attempt to create an account for %s: %s" % (form('username'), e))
return error(str(e))
try:
tools.mark_code_as_used(form('invite_code'), db)
except Error as e:
LOG.warning("There was a problem verifying the invitation code, please try again later.", e)
return error(str(e))
LOG.info("New account successfully created for %s" % form('username'))
return index_tpl(alerts=[('success', i18n.msg[9], 'fadeOut')], str=i18n.str)
@post('/edit_fullname')
def post_edit_fullname():
form = request.forms.getunicode
try:
username = newSession().get()['username']
old_firstname = newSession().get()['firstname']
old_surname = newSession().get()['surname']
except Error as e:
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
def error(msg):
return edit_fullname_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
if len(form('firstname')) < 3:
return error(i18n.msg[11])
elif not tools.input_validation(form('firstname'), True):
return error(i18n.msg[4])
if len(form('surname')) < 3:
return error(i18n.msg[12])
elif not tools.input_validation(form('surname'), True):
return error(i18n.msg[5])
try:
edit_fullname(username, old_firstname, old_surname, form('firstname').title(), form('surname').title())
except Error as e:
LOG.warning("Unsuccessful attempt to edit fullname for %s: %s" % (username, e))
return error(str(e))
return user_tpl(alerts=[('success', i18n.msg[13], 'fadeOut' )], data=newSession().get(), str=i18n.str)
@post('/edit_email')
def post_edit_email():
form = request.forms.getunicode
try:
username = newSession().get()['username']
old_email = newSession().get()['mail']
except Error as e:
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
def error(msg):
return edit_email_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
if not tools.email_validation(form('email')):
return(error(i18n.msg[14]))
try:
edit_email(username, old_email, form('email').lower())
except Error as e:
LOG.warning("Unsuccessful attempt to change email address for %s: %s" % (username, e))
return error(str(e))
return user_tpl(alerts=[('success', i18n.msg[16], 'fadeOut' )], data=newSession().get(), str=i18n.str)
@post('/enable_2fa')
def post_enable_2fa():
def error(msg):
return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
try:
if(not newSession().get()['secureAuth']):
try:
username=newSession().get()['username']
add_auth_attribute_step1(username, tools.gen_secret(), action='enable')
except Error as e:
reload(newSession().get()['username'], None, None)
LOG.warning(e)
return error(i18n.msg[30])
except Error as e:
LOG.warning(e)
return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str)
return _2fa_tpl(alerts=[('success', i18n.msg[31], 'fadeOut')], data=newSession().get(), str=i18n.str)
@post('/disable_2fa')
def post_disable_2fa():
def error(msg):
return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
try:
if(newSession().get()['secureAuth']):
try:
username=newSession().get()['username']
add_auth_attribute_step1(username, None, action='disable')
except Error as e:
reload(newSession().get()['username'], None, None)
LOG.warning(e)
return error(str(e))
except Error as e:
LOG.warning(e)
return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str)
return _2fa_tpl(alerts=[('error', i18n.msg[32], 'fadeOut')], data=newSession().get(), str=i18n.str)
@post('/change_pwd')
def post_change_pwd():
form = request.forms.getunicode
try:
username=newSession().get()['username']
except Error as e:
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
def error(msg):
return change_pwd_tpl(username=username, alerts=[('error', msg, 'fadeOut')], str=i18n.str)
if not tools.pwd_validation(form('old-password')):
return error(i18n.msg[26])
elif (not tools.pwd_validation(form('new-password')) or not tools.pwd_validation(form('confirm-password'))):
return error(i18n.msg[8]) #mezua aldatu egin behar da
elif form('new-password') != form('confirm-password'):
return error(i18n.msg[7])
elif form('old-password') == form('confirm-password'):
return error(i18n.msg[17])
try:
change_passwords(username, form('old-password'), form('new-password'))
logout(username)
except Error as e:
LOG.warning("Unsuccessful attempt to change password for %s: %s" % (username, e))
return error(str(e))
LOG.info("Password successfully changed for: %s" % username)
return index_tpl(alerts=[('success', i18n.msg[18], 'fadeOut')], username=username, str=i18n.str)
@post('/delete')
def post_delete():
form = request.forms.getunicode
def error(msg):
return delete_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
try:
username = newSession().get()['username']
if(tools.input_validation(form('username')) and form('username').lower() == username):
del_user(username)
else:
return(error(i18n.msg[19]))
except Error as e:
LOG.warning("Unsuccessful attempt to delete the account: %s" % e)
return error(str(e))
LOG.info("Account successfully deleted")
return index_tpl(alerts=[('success', i18n.msg[20], 'fadeOut')], str=i18n.str)
@route('/static/<filename>', name='static')
def serve_static(filename):
return static_file(filename, root=path.join(BASE_DIR, 'static'))
@get("/static/fonts/<filepath:re:.*\\.(eot|otf|svg|ttf|woff|woff2?)>")
def font(filepath):
return static_file(filepath, root="static/fonts")
@get("/static/tmp/<filepath:re:.*\\.(png|svg)>")
def font(filepath):
return static_file(filepath, root="static/tmp")
def index_tpl(**kwargs):
return template('index', **kwargs)
def user_tpl(**kwargs):
return template('user', **kwargs)
def signup_tpl(**kwargs):
return template('signup', **kwargs)
def change_pwd_tpl(**kwargs):
return template('change_pwd', **kwargs)
def edit_email_tpl(**kwargs):
return template('edit_email', **kwargs)
def edit_fullname_tpl(**kwargs):
return template('edit_fullname', **kwargs)
def delete_tpl(**kwargs):
return template('delete', **kwargs)
def logs_tpl(**kwargs):
return template('logs', **kwargs)
def _2fa_tpl(**kwargs):
return template('_2fa', **kwargs)
def connect_ldap(conf, **kwargs):
server = Server(host=conf['host'],
port=conf.getint('port', None),
use_ssl=conf.getboolean('use_ssl', False),
connect_timeout=5)
return Connection(server, raise_exceptions=True, **kwargs)
@error(404)
@error(405)
def error40x(error):
return index_tpl(str=i18n.str)
#LOGIN
def login(username, password):
n = N
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("%s is trying to logging in %s" % (username, key))
n -= 1
try:
login_user(CONF[key], username, password)
except Error as e:
if n >=1:
e = []
continue
else:
raise e
break
def login_user(conf, *args):
try:
login_user_ldap(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[21])
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def login_user_ldap(conf, username, password):
#set current LDAP
superUser = SuperUsers(conf)
#with connect_ldap(conf) as c:
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
user_dn = find_user_dn(conf, c, username)
# Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=password) as c:
c.bind()
if is_trusted_device(conf, user_dn):
newSession().set(get_user_data(user_dn, c))
#update timestamp + ip address
update_login_info(conf, user_dn)
LOG.debug("%s logged in to %s" % (username, conf['base']))
#generate qr if it doenst exists when 2fa enable
if(newSession().get()['secureAuth']):
tools.gen_qr(newSession().get()['authCode'])
#LOGOUT
def logout(username):
n = N
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Logging out %s from %s" % (username, key))
n -= 1
try:
logout_user(CONF[key], username)
except Error as e:
if n >=1:
e = []
continue
else:
raise e
break
def logout_user(conf, *args):
try:
logout_user_ldap(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[21])
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def logout_user_ldap(conf, username):
#set current LDAP
superUser = SuperUsers(conf)
#with connect_ldap(conf) as c:
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
user_dn = find_user_dn(conf, c, username)
c.unbind()
#newSession().close()
newSession().delete()
LOG.info("%s LOGED OUT" % (username))
#SIGN UP
def account_request(username, firstname, surname, password, email, isFake, device):
created = []
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Creating account for %s on %s server" % (username, key))
try:
new_user_account(CONF[key], username, firstname, surname, password, email, isFake, device)
created.append(key)
except Error as e:
for key in reversed(created):
LOG.info("Reverting account creation in %s for %s" % (key, username))
try:
#Akatsen bat gertatzen bada LDAP instantzia guztietan kontua ezabatu
del_account(CONF[key], username)
except Error as e2:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
raise e
def new_user_account(conf, *args):
try:
register(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[21])
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[22])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def register(conf, username, firstname, surname, password, email, isFake, device):
def to_ascii(str):
ascii_str=""
for c in str:
if 0 <= ord(c) <= 127:
ascii_str=ascii_str+c
else:
ascii_str=ascii_str+"X"
return(ascii_str)
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
try:
if (find_user_dn(conf,c,username) is not None):
raise Error(i18n.msg[24])
if (find_email(conf,c,email)):
raise Error(i18n.msg[25])
except Error as e:
raise e
else:
#create new account
uidNumber = find_uid_number(conf,c)+1
directory = 'home/user/'+to_ascii(username)
OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z'
attributes = {'gidNumber': '501', 'uidNumber': uidNumber, 'homeDirectory': directory, 'givenName':
firstname, 'sn': surname, 'uid' : username, 'mail': email, 'active': False, 'fakeCn': isFake,
'devices':device, 'ip':request.environ.get('HTTP_X_REAL_IP', request.remote_addr), 'lastLogin': ts,
'secureAuth': False}
#new_user_dn = "cn="+firstname+" "+surname+" - "+username+",cn=users,"+conf['base']
new_user_dn = "cn="+firstname+" "+surname+",cn=users,"+conf['base']
c.add(dn=new_user_dn,object_class=OBJECT_CLASS, attributes=attributes)
#create/change user password
c.extend.standard.modify_password(new_user_dn, '', password)
LOG.info("%s has registered on %s" % (username, conf))
#EDIT FULLNAME
def edit_fullname(username, old_firstname, old_surname, firstname, surname,):
changed = []
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Changing fullname in %s for %s" % (key, username))
try:
new_fullname(CONF[key], username, firstname, surname)
changed.append(key)
LOG.debug("%s changed fullname on %s" % (username, key))
except Error as e:
for key in reversed(changed):
LOG.info("Reverting fullname change in %s for %s" % (key, username))
try:
new_fullname(CONF[key], username, old_firstname, old_surname)
except Error as e2:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
raise e
def new_fullname(conf, *args):
try:
update_fullname(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[26])
except LDAPConstraintViolationResult as e:
# Extract useful part of the error message (for Samba 4 / AD).
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
raise Error(msg)
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def update_fullname(conf, username, firstname, surname):
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
#with connect_ldap(conf) as c:
user_dn = find_user_dn(conf, c, username)
c.modify(user_dn, {'givenName': [( MODIFY_REPLACE, firstname )], 'sn': [( MODIFY_REPLACE, surname )]})
new_cn = "cn="+firstname+" "+ surname+" - "+ username
c.modify_dn(user_dn, new_cn)
new_user_dn = new_cn+",cn=users,"+conf['base']
base = ",cn=users," + conf['base']
fakeFullName = user_dn[3:-len(base)].split(" ")
if(user_dn == new_user_dn):
raise Error(i18n.msg[10])
c.modify(new_user_dn, {'fakeCn': [(MODIFY_REPLACE, 'false' )]})
newSession().set(get_user_data(new_user_dn, c))
#EDIT EMAIL
def edit_email(username, old_email, new_email):
changed = []
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Changing email in %s for %s" % (key, username))
try:
new_email_address(CONF[key], username, old_email, new_email)
changed.append(key)
LOG.debug("%s changed email address on %s" % (username, key))
except Error as e:
for key in reversed(changed):
LOG.info("Reverting email change in %s for %s" % (key, username))
try:
new_email_address(CONF[key], username, new_email, old_email)
except Error as e2:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
raise e
def new_email_address(conf, *args):
try:
update_email_address(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[26])
except LDAPConstraintViolationResult as e:
# Extract useful part of the error message (for Samba 4 / AD).
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
raise Error(msg)
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def update_email_address(conf, username, old_email, new_email):
if(old_email == new_email):
raise Error(i18n.msg[15])
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
user_dn = find_user_dn(conf, c, username)
new_email_addresses = get_user_email_array(user_dn, c, old_email, new_email)
c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]})
newSession().set(get_user_data(user_dn, c))
# ADD AUTHCODE ATTRIBUTE - 2FA
def add_auth_attribute_step1(username, code, action):
changed = []
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Adding secureAuth attribute %s to %s" % (key, username))
try:
add_auth_attribute_step2(CONF[key], username, code, action)
changed.append(key)
LOG.debug("%s has activated 2FA authentication on %s" % (username, key))
except Error as e:
for key in reversed(changed):
LOG.info("Reverting 2FA activation in %s for %s due to errors" % (key, username))
try:
new_email_address(CONF[key], username, new_email, old_email)
except Error as e2:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
raise e
def add_auth_attribute_step2(conf, *args):
try:
add_auth_attribute_step3(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[26])
except LDAPConstraintViolationResult as e:
# Extract useful part of the error message (for Samba 4 / AD).
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
raise Error(msg)
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPNoSuchAttributeResult as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[33])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def add_auth_attribute_step3(conf, username, code, action):
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
user_dn = find_user_dn(conf, c, username)
if(action == 'enable'):
c.modify(user_dn,{'authCode': [(MODIFY_ADD, [code])]})
c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [True]]})
elif(action == 'disable'):
c.modify(user_dn,{'authCode': [(MODIFY_DELETE, [])]})
c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [False]]})
#remove file
try:
remove('static/tmp/'+newSession().get()['authCode']+'.png')
except OSError as e:
LOG.warning(str(e))
#raise Error(e)
pass
newSession().set(get_user_data(user_dn, c))
reload=add_auth_attribute_step1
#CHANGE PASSWORD
def change_passwords(username, old_pass, new_pass):
changed = []
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Changing password in %s for %s" % (key, username))
try:
change_password(CONF[key], username, old_pass, new_pass)
changed.append(key)
LOG.debug("%s changed pwd on %s" % (username, key))
except Error as e:
for key in reversed(changed):
LOG.info("Reverting password change in %s for %s" % (key, username))
try:
change_password(CONF[key], username, new_pass, old_pass)
except Error as e2:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
raise e
def change_password(conf, *args):
try:
if conf.get('type') == 'ad':
change_password_ad(conf, *args)
else:
change_password_ldap(conf, *args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
raise Error(i18n.msg[26])
except LDAPConstraintViolationResult as e:
# Extract useful part of the error message (for Samba 4 / AD).
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
raise Error(msg)
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def change_password_ldap(conf, username, old_pass, new_pass):
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
#with connect_ldap(conf) as c:
user_dn = find_user_dn(conf, c, username)
# Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=old_pass) as c:
c.bind()
c.extend.standard.modify_password(user_dn, old_pass, new_pass)
def change_password_ad(conf, username, old_pass, new_pass):
user = username + '@' + conf['ad_domain']
with connect_ldap(conf, authentication=SIMPLE, user=user, password=old_pass) as c:
c.bind()
user_dn = find_user_dn(conf, c, username)
c.extend.microsoft.modify_password(user_dn, new_pass, old_pass)
#DELETE ACCOUNT
def del_user(username):
n = N
for key in (key for key in CONF.sections()
if key == 'ldap' or key.startswith('ldap:')):
LOG.debug("Deleting account for %s from %s" % (username, key))
n -= 1
try:
del_account(CONF[key], username)
LOG.debug("Account for %s deleted on -> %s" % (username, CONF[key]))
if(n == 0 and newSession().get()['username'] is not None):
newSession().delete()
except Error as e:
raise e
def del_account(conf, *args):
try:
delete(conf, *args)
except LDAPSocketOpenError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
except LDAPExceptionError as e:
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
raise Error(i18n.msg[23])
def delete(conf, username):
#set current LDAP
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
try:
user_dn = find_user_dn(conf, c, username)
c.delete(user_dn)
except Error as e:
raise e
#AUXILIARY FUNCTIONS
#find user
def find_user_dn(conf, conn, uid):
search_filter = conf['search_filter'].replace('{uid}', uid)
conn.search(conf['base'], "(%s)" % search_filter, SUBTREE)
return conn.response[0]['dn'] if conn.response else None
#find email
def find_email(conf, conn, email):
search_filter = '(uid=*)'
if conn.search(conf['base'], search_filter, attributes=['mail']):
for i in conn.response:
for j in i['attributes']['mail']:
if(j == email):
return True
return False
#find device
def find_device(user_dn, conn, device):
search_filter = '(objectClass=*)'
if conn.search(user_dn, search_filter, attributes=['devices']):
for i in conn.response:
for j in i['attributes']['devices']:
if(j == device):
return True
return False
#find highest uidNumber
def find_uid_number(conf, conn):
search_filter = '(uid=*)'
if conn.search(conf['base'], search_filter, attributes=['uidNumber']):
uidNumbersList=[]
for i in conn.response:
uidNumbersList.append(i['attributes']['uidNumber'])
uidNumbersList = list(filter(lambda i: type(i) is int, uidNumbersList))
return max(uidNumbersList)
else:
return(999)
def get_user_email_array(user_dn, conn, old_email, new_email):
search_filter = '(objectClass=*)'
conn.search(user_dn, search_filter, attributes=['mail'])
emails = conn.entries[0].mail.values
for i in range(len(emails)):
if(emails[i] == old_email):
emails[i] = new_email
return(emails)
def check_secure_auth(user_dn, conn):
search_filter = '(objectClass=*)'
conn.search(user_dn, search_filter, attributes=['secureAuth'])
status = conn.entries[0].secureAuth
return(status)
def get_user_data(user_dn, conn):
search_filter = '(objectClass=*)'
conn.search(user_dn, search_filter,
attributes=['active','fakeCn','givenName','sn','uid','mail','devices','ip','lastLogin','secureAuth',
'authCode'])
data = []
data.append(conn.entries[0].active.values[0])
data.append(conn.entries[0].fakeCn.values[0])
data.append(conn.entries[0].givenName.values[0])
data.append(conn.entries[0].sn.values[0])
data.append(conn.entries[0].uid.values[0])
data.append(conn.entries[0].mail.values[0])
data.append(conn.entries[0].devices.values)
if(conn.entries[0].ip):
data.append(conn.entries[0].ip.values[0])
else:
data.append(request.environ.get('HTTP_X_REAL_IP', request.remote_addr))
#ts = conn.entries[0].lastLogin.values[0]
#ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S%z')
#ts = datetime.strftime(t, '%Y-%m-%d %H:%M:%S')
if(conn.entries[0].lastLogin):
data.append(str(conn.entries[0].lastLogin.values[0])[:-6])
else:
data.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
data.append(conn.entries[0].secureAuth.values[0])
if(conn.entries[0].authCode):
data.append(conn.entries[0].authCode.values[0])
return(data)
def read_config():
config = ConfigParser()
config.read([path.join(BASE_DIR, 'settings.ini'), getenv('CONF_FILE', '')])
return config
CONF = read_config()
def ldaps_count():
keys = []
for i, key in enumerate(CONF.sections()):
if key == 'ldap' or key.startswith('ldap:'):
keys.append(key)
#n=len(keys)
return(len(keys))
N = ldaps_count()
def reg():
if CONF['general']['allow_registration'] == 'True':
return True
allowed = reg()
def get_dev():
ua_string = bottle.request.environ.get('HTTP_USER_AGENT')
user_agent = ua_parse(ua_string)
return str(user_agent)
def is_trusted_device(conf, user_dn):
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
d = get_dev()
try:
if not find_device(user_dn, c, d):
OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] })
c.unbind()
return True
except Exception as e:
LOG.warning(e)
return True
def update_login_info(conf, user_dn):
superUser = SuperUsers(conf)
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
ip = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z'
c.modify(user_dn, {'ip': [( MODIFY_REPLACE, str(ip) )], 'lastLogin': [( MODIFY_REPLACE, ts )] })
c.unbind()
class Error(Exception):
pass
# TEMPORAL MEMORY
class tMemory(object):
def __init__(self):
self.data = None
self.key = None
memo = tMemory()
#SESSIONS MANAGEMENT
def newSession():
class Session(object):
"""docstring for Session"""
def __init__(self):
super(Session, self).__init__()
self.data = bottle.request.environ.get('beaker.session')
self.sid = self.data.id
#localization
self.lang = self.get_lang()
global i18n
i18n = LocalizeTo(self.lang, CONF)
def get_lang(self):
if 'HTTP_ACCEPT_LANGUAGE' in bottle.request.environ:
lang = bottle.request.get('HTTP_ACCEPT_LANGUAGE')
return str(lang[:2])
else:
return CONF['locale']['lang']
def get(self):
if 'username' in self.data:
return(self.data)
else:
raise Error(i18n.msg[27])
def set(self, data):
self.active = data[0]
self.fakeCn = data[1]
self.firstname = data[2]
self.surname = data[3]
self.username = data[4]
self.mail = data[5]
self.devices = data[6]
self.ip = data[7]
self.lastLogin = data[8]
self.secureAuth = data[9]
try:
self.authCode = data[10]
except:
self.authCode = None
self.data['active'] = self.active
self.data['fakeCn'] = self.fakeCn
self.data['firstname'] = self.firstname
self.data['surname'] = self.surname
self.data['username'] = self.username
self.data['mail'] = self.mail
self.data['devices'] = self.devices
self.data['ip'] = self.ip
self.data['lastLogin'] = self.lastLogin
self.data['secureAuth'] = self.secureAuth
self.data['authCode'] = self.authCode
self.data['id'] = self.sid
def close(self):
self.data.pop('username')
def delete(self):
self.data.delete()
s=Session()
return s
if environ.get('DEBUG'):
bottle.debug(True)
# Set up logging.
logging.basicConfig(format=LOG_FORMAT)
LOG.setLevel(logging.INFO)
LOG.info("Starting ldap-python-webui %s" % VERSION)
session_opts = {
'session.type': CONF['session']['type'],
'session.cookie_expires': CONF['session']['expire'],
'session.data_dir': CONF['session']['data_dir'],
'session.auto': CONF['session']['auto']
}
class SuperUsers(object):
"""docstring for Session"""
def __init__(self, conf):
super(SuperUsers, self).__init__()
self.domain=conf['base'][conf['base'].find(","):]
self.admin_dn="cn=admin"+self.domain
self.admin_pwd=environ['LDAP_ADMIN_PASSWORD']
self.readonly_dn="cn=readonly"+self.domain
self.readonly_pwd=environ['LDAP_READONLY_PASSWORD']
superUser = SuperUsers(CONF['ldap:0'])
#app = beaker.middleware.SessionMiddleware(bottle.app(), session_opts)
app = SessionMiddleware(bottle.app(), session_opts)
bottle.TEMPLATE_PATH = [BASE_DIR]
# Set default attributes to pass into templates.
#SimpleTemplate.defaults = dict(CONF['html'])
SimpleTemplate.defaults['url'] = bottle.url
# Run bottle internal server when invoked directly (mainly for development).
if __name__ == '__main__':
bottle.run(app, **CONF['server'])
# Run bottle in application mode (in production under uWSGI server).
else:
#application = bottle.default_app()
application = app