982 lines
32 KiB
Python
982 lines
32 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
'''
|
||
|
ldap-python-webui :: LDAP kudeaketarako Web Interfazea - Web UI for LDAP management.
|
||
|
Copyright (C) 2022 Aitzol Berasategi - Wproject
|
||
|
|
||
|
This program is free software: you can redistribute it and/or modify
|
||
|
it under the terms of the GNU General Public License as published by
|
||
|
the Free Software Foundation, either version 3 of the License, or
|
||
|
(at your option) any later version.
|
||
|
|
||
|
This program is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU General Public License
|
||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
'''
|
||
|
|
||
|
import bottle
|
||
|
from bottle import get, post, static_file, request, route, template
|
||
|
from bottle import SimpleTemplate
|
||
|
from bottle.ext import beaker
|
||
|
from configparser import ConfigParser
|
||
|
from ldap3 import Server, Connection, ALL
|
||
|
from ldap3 import SIMPLE, SUBTREE, MODIFY_REPLACE, ALL_ATTRIBUTES
|
||
|
from ldap3.core.exceptions import LDAPBindError, LDAPConstraintViolationResult, \
|
||
|
LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError, \
|
||
|
LDAPSocketOpenError, LDAPExceptionError
|
||
|
import logging
|
||
|
from os import getenv, environ, path
|
||
|
import re
|
||
|
import sqlite3
|
||
|
from data import flist, slist
|
||
|
import random
|
||
|
import gettext
|
||
|
|
||
|
BASE_DIR = path.dirname(__file__)
|
||
|
LOG = logging.getLogger(__name__)
|
||
|
LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s'
|
||
|
VERSION = '0.0.1'
|
||
|
|
||
|
@get('/')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return user_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/user')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return user_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/signup')
|
||
|
def get_index():
|
||
|
newSession()
|
||
|
return signup_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/change_pwd')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return change_pwd_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/edit_fullname')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return edit_fullname_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/edit_email')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return edit_email_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/delete')
|
||
|
def get_index():
|
||
|
try:
|
||
|
return delete_tpl(data=newSession().get(), str=i18n.str)
|
||
|
except Exception as e:
|
||
|
return index_tpl(str=i18n.str)
|
||
|
|
||
|
@get('/logout')
|
||
|
def get_index():
|
||
|
|
||
|
def error(msg):
|
||
|
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
try:
|
||
|
username = newSession().get()['username']
|
||
|
if(username is not None):
|
||
|
logout(username)
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to log out: %s" % e)
|
||
|
return error(str(e))
|
||
|
|
||
|
return index_tpl(alerts=[('success', i18n.msg[0], 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
@post('/user')
|
||
|
def post_user():
|
||
|
form = request.forms.getunicode
|
||
|
|
||
|
def error(msg):
|
||
|
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
if len(form('username')) < 3:
|
||
|
return error(i18n.msg[1])
|
||
|
|
||
|
if len(form('password')) < 1:
|
||
|
return error(i18n.msg[2])
|
||
|
|
||
|
try:
|
||
|
login(form('username'), form('password'))
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
|
||
|
return error(str(e))
|
||
|
|
||
|
return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[3], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
|
||
|
|
||
|
@post('/signup')
|
||
|
def post_signup():
|
||
|
#ensure that i18n exists
|
||
|
if 'i18n' not in globals():
|
||
|
newSession()
|
||
|
|
||
|
form = request.forms.getunicode
|
||
|
isFake = False
|
||
|
|
||
|
def username_validation(e):
|
||
|
regex = r'^\w+$'
|
||
|
return(bool(re.fullmatch(regex, e)))
|
||
|
|
||
|
def email_validation(e):
|
||
|
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||
|
return(bool(re.fullmatch(regex, e)))
|
||
|
|
||
|
def code_is_valid(code):
|
||
|
con = sqlite3.connect('data/invite-codes.db')
|
||
|
cur = con.cursor()
|
||
|
|
||
|
codes=[]
|
||
|
for row in cur.execute('SELECT * FROM codes WHERE valid = 1'):
|
||
|
codes.append(row[0])
|
||
|
|
||
|
return(bool(code in codes))
|
||
|
|
||
|
def mark_code_as_used(code):
|
||
|
con = sqlite3.connect('data/invite-codes.db')
|
||
|
cur = con.cursor()
|
||
|
|
||
|
cur.execute('''UPDATE codes SET valid=? WHERE code==?''',(0, code))
|
||
|
con.commit()
|
||
|
|
||
|
def auto_complete(arg):
|
||
|
if arg == 'firstname':
|
||
|
result = random.choice(flist.firstname)
|
||
|
elif arg == 'surname':
|
||
|
result = random.choice(slist.surname)
|
||
|
return(result.lower())
|
||
|
|
||
|
def error(msg):
|
||
|
return signup_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
if not code_is_valid(form('invite_code')):
|
||
|
return(error(i18n.msg[4]))
|
||
|
|
||
|
if len(form('username')) < 3:
|
||
|
return error(i18n.msg[5])
|
||
|
|
||
|
username = form('username').lower()
|
||
|
if not username_validation(username):
|
||
|
return error(i18n.msg[6])
|
||
|
|
||
|
if len(form('firstname')) == 0:
|
||
|
firstname = auto_complete('firstname')
|
||
|
isFake = True
|
||
|
else:
|
||
|
firstname = form('firstname').lower()
|
||
|
|
||
|
if len(form('surname')) == 0:
|
||
|
surname = auto_complete('surname')
|
||
|
isFake = True
|
||
|
else:
|
||
|
surname = form('surname').lower()
|
||
|
|
||
|
email = form('email').lower()
|
||
|
|
||
|
if form('password') != form('confirm-password'):
|
||
|
return error(i18n.msg[7])
|
||
|
|
||
|
if len(form('password')) < 8:
|
||
|
return error(i18n.msg[8])
|
||
|
|
||
|
try:
|
||
|
account_request(username, firstname, surname, form('password'), email, isFake)
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to create an account for %s: %s" % (form('username'), e))
|
||
|
return error(str(e))
|
||
|
|
||
|
try:
|
||
|
mark_code_as_used(form('invite_code'))
|
||
|
except Error as e:
|
||
|
LOG.warning("There was a problem verifying the invitation code, please try again later.", e)
|
||
|
return error(str(e))
|
||
|
|
||
|
LOG.info("New account successfully created for %s" % form('username'))
|
||
|
|
||
|
return index_tpl(alerts=[('success', i18n.msg[9], 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
@post('/edit_fullname')
|
||
|
def post_edit_fullname():
|
||
|
form = request.forms.getunicode
|
||
|
|
||
|
try:
|
||
|
username = newSession().get()['username']
|
||
|
old_firstname = newSession().get()['firstname']
|
||
|
old_surname = newSession().get()['surname']
|
||
|
except Error as e:
|
||
|
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
def error(msg):
|
||
|
return edit_fullname_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
|
||
|
|
||
|
if len(form('firstname')) < 3:
|
||
|
return error(i18n.msg[10])
|
||
|
|
||
|
if len(form('surname')) < 3:
|
||
|
return error(i18n.msg[11])
|
||
|
|
||
|
try:
|
||
|
edit_fullname(username, old_firstname, old_surname, form('firstname').lower(), form('surname').lower())
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to edit fullname for %s: %s" % (username, e))
|
||
|
return error(str(e))
|
||
|
|
||
|
return user_tpl(alerts=[('success', i18n.msg[12], 'fadeOut' )], data=newSession().get(), str=i18n.str)
|
||
|
|
||
|
@post('/edit_email')
|
||
|
def post_edit_email():
|
||
|
form = request.forms.getunicode
|
||
|
|
||
|
try:
|
||
|
username = newSession().get()['username']
|
||
|
old_email = newSession().get()['mail']
|
||
|
except Error as e:
|
||
|
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
def email_is_valid(e):
|
||
|
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||
|
return(bool(re.fullmatch(regex, e)))
|
||
|
|
||
|
def error(msg):
|
||
|
return edit_email_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
|
||
|
|
||
|
if not email_is_valid(form('email')):
|
||
|
return(error(i18n.msg[13]))
|
||
|
|
||
|
try:
|
||
|
edit_email(username, old_email, form('email').lower())
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to change email addres for %s: %s" % (username, e))
|
||
|
return error(str(e))
|
||
|
|
||
|
return user_tpl(alerts=[('success', i18n.msg[14], 'fadeOut' )], data=newSession().get(), str=i18n.str)
|
||
|
|
||
|
@post('/change_pwd')
|
||
|
def post_change_pwd():
|
||
|
form = request.forms.getunicode
|
||
|
try:
|
||
|
username=newSession().get()['username']
|
||
|
except Error as e:
|
||
|
return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
def error(msg):
|
||
|
return change_pwd_tpl(username=username, alerts=[('error', msg, 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
if form('new-password') != form('confirm-password'):
|
||
|
return error(i18n.msg[7])
|
||
|
|
||
|
if len(form('new-password')) < 8:
|
||
|
return error(i18n.msg[8])
|
||
|
|
||
|
if form('old-password') == form('confirm-password'):
|
||
|
return error(i18n.msg[15])
|
||
|
|
||
|
try:
|
||
|
change_passwords(username, form('old-password'), form('new-password'))
|
||
|
logout(username)
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to change password for %s: %s" % (username, e))
|
||
|
return error(str(e))
|
||
|
|
||
|
LOG.info("Password successfully changed for: %s" % username)
|
||
|
|
||
|
return index_tpl(alerts=[('success', i18n.msg[16], 'fadeOut')], username=username, str=i18n.str)
|
||
|
|
||
|
@post('/delete')
|
||
|
def post_delete():
|
||
|
form = request.forms.getunicode
|
||
|
|
||
|
def error(msg):
|
||
|
return delete_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
try:
|
||
|
username = newSession().get()['username']
|
||
|
if(form('username').lower() == username):
|
||
|
del_user(username)
|
||
|
else:
|
||
|
return(error(i18n.msg[17]))
|
||
|
except Error as e:
|
||
|
LOG.warning("Unsuccessful attempt to delete the account: %s" % e)
|
||
|
return error(str(e))
|
||
|
|
||
|
LOG.info("Account successfully deleted")
|
||
|
|
||
|
return index_tpl(alerts=[('success', i18n.msg[18], 'fadeOut')], str=i18n.str)
|
||
|
|
||
|
@route('/static/<filename>', name='static')
|
||
|
def serve_static(filename):
|
||
|
return static_file(filename, root=path.join(BASE_DIR, 'static'))
|
||
|
|
||
|
def index_tpl(**kwargs):
|
||
|
return template('index', **kwargs)
|
||
|
|
||
|
def user_tpl(**kwargs):
|
||
|
return template('user', **kwargs)
|
||
|
|
||
|
def signup_tpl(**kwargs):
|
||
|
return template('signup', **kwargs)
|
||
|
|
||
|
def change_pwd_tpl(**kwargs):
|
||
|
return template('change_pwd', **kwargs)
|
||
|
|
||
|
def edit_email_tpl(**kwargs):
|
||
|
return template('edit_email', **kwargs)
|
||
|
|
||
|
def edit_fullname_tpl(**kwargs):
|
||
|
return template('edit_fullname', **kwargs)
|
||
|
|
||
|
def delete_tpl(**kwargs):
|
||
|
return template('delete', **kwargs)
|
||
|
|
||
|
def connect_ldap(conf, **kwargs):
|
||
|
server = Server(host=conf['host'],
|
||
|
port=conf.getint('port', None),
|
||
|
use_ssl=conf.getboolean('use_ssl', False),
|
||
|
connect_timeout=5)
|
||
|
|
||
|
return Connection(server, raise_exceptions=True, **kwargs)
|
||
|
|
||
|
#LOGIN
|
||
|
def login(username, password):
|
||
|
n = N
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("%s is trying to logging in %s" % (username, key))
|
||
|
n -= 1
|
||
|
try:
|
||
|
login_user(CONF[key], username, password)
|
||
|
except Error as e:
|
||
|
if n >=1:
|
||
|
e = []
|
||
|
continue
|
||
|
else:
|
||
|
raise e
|
||
|
|
||
|
break
|
||
|
|
||
|
def login_user(conf, *args):
|
||
|
try:
|
||
|
login_user_ldap(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[19])
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
def login_user_ldap(conf, username, password):
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
#with connect_ldap(conf) as c:
|
||
|
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
|
||
|
# Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
|
||
|
with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=password) as c:
|
||
|
c.bind()
|
||
|
newSession().set(get_user_data(user_dn, c))
|
||
|
LOG.debug("%s logged in to %s" % (username, conf['base']))
|
||
|
|
||
|
#LOGOUT
|
||
|
def logout(username):
|
||
|
n = N
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("Logging out %s from %s" % (username, key))
|
||
|
n -= 1
|
||
|
try:
|
||
|
logout_user(CONF[key], username)
|
||
|
except Error as e:
|
||
|
if n >=1:
|
||
|
e = []
|
||
|
continue
|
||
|
else:
|
||
|
raise e
|
||
|
|
||
|
break
|
||
|
|
||
|
def logout_user(conf, *args):
|
||
|
try:
|
||
|
logout_user_ldap(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[19])
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
def logout_user_ldap(conf, username):
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
#with connect_ldap(conf) as c:
|
||
|
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
c.unbind()
|
||
|
#newSession().close()
|
||
|
newSession().delete()
|
||
|
LOG.info("%s LOGED OUT" % (username))
|
||
|
|
||
|
#SIGN UP
|
||
|
def account_request(username, firstname, surname, password, email, isFake):
|
||
|
created = []
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("Creating account for %s on %s server" % (username, key))
|
||
|
try:
|
||
|
new_user_account(CONF[key], username, firstname, surname, password, email, isFake)
|
||
|
created.append(key)
|
||
|
except Error as e:
|
||
|
for key in reversed(created):
|
||
|
LOG.info("Reverting account creation in %s for %s" % (key, username))
|
||
|
try:
|
||
|
#Akatsen bat gertatzen bada LDAP instantzia guztietan kontua ezabatu
|
||
|
del_account(CONF[key], username)
|
||
|
except Error as e2:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
|
||
|
raise e
|
||
|
|
||
|
def new_user_account(conf, *args):
|
||
|
try:
|
||
|
register(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[19])
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
def register(conf, username, firstname, surname, password, email, isFake):
|
||
|
|
||
|
def to_ascii(str):
|
||
|
ascii_str=""
|
||
|
for c in str:
|
||
|
if 0 <= ord(c) <= 127:
|
||
|
ascii_str=ascii_str+c
|
||
|
else:
|
||
|
ascii_str=ascii_str+"X"
|
||
|
return(ascii_str)
|
||
|
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
|
||
|
|
||
|
try:
|
||
|
if (find_user_dn(conf,c,username) is not None):
|
||
|
raise Error(i18n.msg[22])
|
||
|
|
||
|
if (find_email(conf,c,email)):
|
||
|
raise Error(i18n.msg[23])
|
||
|
|
||
|
except Error as e:
|
||
|
raise e
|
||
|
|
||
|
else:
|
||
|
#create new account
|
||
|
uidNumber = find_uid_number(conf,c)+1
|
||
|
directory = 'home/user/'+to_ascii(username)
|
||
|
OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
|
||
|
attributes = {'gidNumber': '501', 'uidNumber': uidNumber, 'homeDirectory': directory, 'givenName': firstname, 'sn': surname, 'uid' : username, 'mail': email, 'active': False, 'fakeCn': isFake}
|
||
|
new_user_dn = "cn="+firstname+" "+ surname+",cn=users,"+conf['base']
|
||
|
c.add(dn=new_user_dn,object_class=OBJECT_CLASS, attributes=attributes)
|
||
|
#create/change user password
|
||
|
c.extend.standard.modify_password(new_user_dn, '', password)
|
||
|
LOG.info("%s has registered on %s" % (username, conf))
|
||
|
|
||
|
#EDIT FULLNAME
|
||
|
def edit_fullname(username, old_firstname, old_surname, firstname, surname,):
|
||
|
changed = []
|
||
|
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("Changing fullname in %s for %s" % (key, username))
|
||
|
try:
|
||
|
new_fullname(CONF[key], username, firstname, surname)
|
||
|
changed.append(key)
|
||
|
LOG.debug("%s changed fullname on %s" % (username, key))
|
||
|
except Error as e:
|
||
|
for key in reversed(changed):
|
||
|
LOG.info("Reverting fullname change in %s for %s" % (key, username))
|
||
|
try:
|
||
|
new_fullname(CONF[key], username, old_firstname, old_surname)
|
||
|
except Error as e2:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
|
||
|
raise e
|
||
|
|
||
|
def new_fullname(conf, *args):
|
||
|
try:
|
||
|
update_fullname(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[24])
|
||
|
|
||
|
except LDAPConstraintViolationResult as e:
|
||
|
# Extract useful part of the error message (for Samba 4 / AD).
|
||
|
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
|
||
|
raise Error(msg)
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
def update_fullname(conf, username, firstname, surname):
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
|
||
|
#with connect_ldap(conf) as c:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
c.modify(user_dn, {'givenName': [( MODIFY_REPLACE, firstname )], 'sn': [( MODIFY_REPLACE, surname )]})
|
||
|
|
||
|
new_cn = "cn="+firstname+" "+ surname
|
||
|
c.modify_dn(user_dn, new_cn)
|
||
|
new_user_dn = new_cn+",cn=users,"+conf['base']
|
||
|
|
||
|
base = ",cn=users," + conf['base']
|
||
|
fakeFullName = user_dn[3:-len(base)].split(" ")
|
||
|
|
||
|
if(user_dn == new_user_dn):
|
||
|
raise Error('Izen-abizenak ez dira aldatu.')
|
||
|
|
||
|
c.modify(new_user_dn, {'fakeCn': [(MODIFY_REPLACE, 'false' )]})
|
||
|
newSession().set(get_user_data(new_user_dn, c))
|
||
|
|
||
|
#EDIT EMAIL
|
||
|
def edit_email(username, old_email, new_email):
|
||
|
changed = []
|
||
|
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("Changing email in %s for %s" % (key, username))
|
||
|
try:
|
||
|
new_email_address(CONF[key], username, old_email, new_email)
|
||
|
changed.append(key)
|
||
|
LOG.debug("%s changed email address on %s" % (username, key))
|
||
|
except Error as e:
|
||
|
for key in reversed(changed):
|
||
|
LOG.info("Reverting email change in %s for %s" % (key, username))
|
||
|
try:
|
||
|
new_email_address(CONF[key], username, new_email, old_email)
|
||
|
except Error as e2:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
|
||
|
raise e
|
||
|
|
||
|
def new_email_address(conf, *args):
|
||
|
try:
|
||
|
update_email_address(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[24])
|
||
|
|
||
|
except LDAPConstraintViolationResult as e:
|
||
|
# Extract useful part of the error message (for Samba 4 / AD).
|
||
|
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
|
||
|
raise Error(msg)
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
|
||
|
def update_email_address(conf, username, old_email, new_email):
|
||
|
if(old_email == new_email):
|
||
|
raise Error('Email helbidea ez da aldatu.')
|
||
|
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
new_email_addresses = get_user_email_array(user_dn, c, old_email, new_email)
|
||
|
c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]})
|
||
|
newSession().set(get_user_data(user_dn, c))
|
||
|
|
||
|
#CHANGE PASSWORD
|
||
|
def change_passwords(username, old_pass, new_pass):
|
||
|
changed = []
|
||
|
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
|
||
|
LOG.debug("Changing password in %s for %s" % (key, username))
|
||
|
try:
|
||
|
change_password(CONF[key], username, old_pass, new_pass)
|
||
|
changed.append(key)
|
||
|
LOG.debug("%s changed pwd on %s" % (username, key))
|
||
|
except Error as e:
|
||
|
for key in reversed(changed):
|
||
|
LOG.info("Reverting password change in %s for %s" % (key, username))
|
||
|
try:
|
||
|
change_password(CONF[key], username, new_pass, old_pass)
|
||
|
except Error as e2:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
|
||
|
raise e
|
||
|
|
||
|
def change_password(conf, *args):
|
||
|
try:
|
||
|
if conf.get('type') == 'ad':
|
||
|
change_password_ad(conf, *args)
|
||
|
else:
|
||
|
change_password_ldap(conf, *args)
|
||
|
|
||
|
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
|
||
|
raise Error(i18n.msg[24])
|
||
|
|
||
|
except LDAPConstraintViolationResult as e:
|
||
|
# Extract useful part of the error message (for Samba 4 / AD).
|
||
|
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
|
||
|
raise Error(msg)
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
|
||
|
def change_password_ldap(conf, username, old_pass, new_pass):
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
|
||
|
#with connect_ldap(conf) as c:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
|
||
|
# Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
|
||
|
with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=old_pass) as c:
|
||
|
c.bind()
|
||
|
c.extend.standard.modify_password(user_dn, old_pass, new_pass)
|
||
|
|
||
|
def change_password_ad(conf, username, old_pass, new_pass):
|
||
|
user = username + '@' + conf['ad_domain']
|
||
|
|
||
|
with connect_ldap(conf, authentication=SIMPLE, user=user, password=old_pass) as c:
|
||
|
c.bind()
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
c.extend.microsoft.modify_password(user_dn, new_pass, old_pass)
|
||
|
|
||
|
#DELETE ACCOUNT
|
||
|
def del_user(username):
|
||
|
n = N
|
||
|
for key in (key for key in CONF.sections()
|
||
|
if key == 'ldap' or key.startswith('ldap:')):
|
||
|
LOG.debug("Deleting account for %s from %s" % (username, key))
|
||
|
n -= 1
|
||
|
try:
|
||
|
del_account(CONF[key], username)
|
||
|
LOG.debug("Account for %s deleted on -> %s" % (username, CONF[key]))
|
||
|
if(n == 0 and newSession().get()['username'] is not None):
|
||
|
newSession().delete()
|
||
|
|
||
|
except Error as e:
|
||
|
raise e
|
||
|
|
||
|
def del_account(conf, *args):
|
||
|
try:
|
||
|
delete(conf, *args)
|
||
|
|
||
|
except LDAPSocketOpenError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[20])
|
||
|
|
||
|
except LDAPExceptionError as e:
|
||
|
LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
|
||
|
raise Error(i18n.msg[21])
|
||
|
|
||
|
def delete(conf, username):
|
||
|
#set current LDAP
|
||
|
superUser = SuperUsers(conf)
|
||
|
|
||
|
with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
|
||
|
try:
|
||
|
user_dn = find_user_dn(conf, c, username)
|
||
|
c.delete(user_dn)
|
||
|
|
||
|
except Error as e:
|
||
|
raise e
|
||
|
|
||
|
#AUXILIARY FUNCTIONS
|
||
|
#find user
|
||
|
def find_user_dn(conf, conn, uid):
|
||
|
search_filter = conf['search_filter'].replace('{uid}', uid)
|
||
|
conn.search(conf['base'], "(%s)" % search_filter, SUBTREE)
|
||
|
|
||
|
return conn.response[0]['dn'] if conn.response else None
|
||
|
|
||
|
#find email
|
||
|
def find_email(conf, conn, email):
|
||
|
search_filter = '(uid=*)'
|
||
|
if conn.search(conf['base'], search_filter, attributes=['mail']):
|
||
|
for i in conn.response:
|
||
|
for j in i['attributes']['mail']:
|
||
|
if(j == email):
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
#find highest uidNumber
|
||
|
def find_uid_number(conf, conn):
|
||
|
search_filter = '(uid=*)'
|
||
|
if conn.search(conf['base'], search_filter, attributes=['uidNumber']):
|
||
|
|
||
|
uidNumbersList=[]
|
||
|
for i in conn.response:
|
||
|
uidNumbersList.append(i['attributes']['uidNumber'])
|
||
|
|
||
|
return max(uidNumbersList)
|
||
|
|
||
|
else:
|
||
|
return(999)
|
||
|
|
||
|
def get_user_email_array(user_dn, conn, old_email, new_email):
|
||
|
search_filter = '(objectClass=*)'
|
||
|
conn.search(user_dn, search_filter, attributes=['mail'])
|
||
|
emails = conn.entries[0].mail.values
|
||
|
for i in range(len(emails)):
|
||
|
if(emails[i] == old_email):
|
||
|
emails[i] = new_email
|
||
|
return(emails)
|
||
|
|
||
|
def get_user_data(user_dn, conn):
|
||
|
search_filter = '(objectClass=*)'
|
||
|
conn.search(user_dn, search_filter, attributes=['active','fakeCn','givenName','sn','uid','mail'])
|
||
|
data = []
|
||
|
data.append(conn.entries[0].active.values[0])
|
||
|
data.append(conn.entries[0].fakeCn.values[0])
|
||
|
data.append(conn.entries[0].givenName.values[0])
|
||
|
data.append(conn.entries[0].sn.values[0])
|
||
|
data.append(conn.entries[0].uid.values[0])
|
||
|
data.append(conn.entries[0].mail.values[0])
|
||
|
return(data)
|
||
|
|
||
|
def read_config():
|
||
|
config = ConfigParser()
|
||
|
config.read([path.join(BASE_DIR, 'settings.ini'), getenv('CONF_FILE', '')])
|
||
|
return config
|
||
|
|
||
|
CONF = read_config()
|
||
|
|
||
|
def ldaps_count():
|
||
|
keys = []
|
||
|
for i, key in enumerate(CONF.sections()):
|
||
|
if key == 'ldap' or key.startswith('ldap:'):
|
||
|
keys.append(key)
|
||
|
#n=len(keys)
|
||
|
return(len(keys))
|
||
|
|
||
|
N = ldaps_count()
|
||
|
|
||
|
class Error(Exception):
|
||
|
pass
|
||
|
|
||
|
#SESSIONS MANAGEMENT
|
||
|
def newSession():
|
||
|
|
||
|
class Session(object):
|
||
|
"""docstring for Session"""
|
||
|
def __init__(self):
|
||
|
super(Session, self).__init__()
|
||
|
self.data = bottle.request.environ.get('beaker.session')
|
||
|
self.lang = self.get_lang()
|
||
|
#localization
|
||
|
global i18n
|
||
|
i18n = LocalizeTo(self.lang)
|
||
|
|
||
|
def get_lang(self):
|
||
|
if 'HTTP_ACCEPT_LANGUAGE' in bottle.request.environ:
|
||
|
lang = bottle.request.get('HTTP_ACCEPT_LANGUAGE')
|
||
|
return str(lang[:2])
|
||
|
else:
|
||
|
return CONF['locale']['lang']
|
||
|
|
||
|
def get(self):
|
||
|
if 'username' in self.data:
|
||
|
return(self.data)
|
||
|
else:
|
||
|
raise Error(i18n.msg[25])
|
||
|
|
||
|
def set(self, data):
|
||
|
self.active = data[0]
|
||
|
self.fakeCn = data[1]
|
||
|
self.firstname = data[2]
|
||
|
self.surname = data[3]
|
||
|
self.username = data[4]
|
||
|
self.mail = data[5]
|
||
|
|
||
|
self.data['active'] = self.active
|
||
|
self.data['fakeCn'] = self.fakeCn
|
||
|
self.data['firstname'] = self.firstname
|
||
|
self.data['surname'] = self.surname
|
||
|
self.data['username'] = self.username
|
||
|
self.data['mail'] = self.mail
|
||
|
|
||
|
def close(self):
|
||
|
self.data.pop('username')
|
||
|
|
||
|
def delete(self):
|
||
|
self.data.delete()
|
||
|
|
||
|
s=Session()
|
||
|
return s
|
||
|
|
||
|
class LocalizeTo(object):
|
||
|
"""docstring for Session"""
|
||
|
def __init__(self, lang):
|
||
|
super(LocalizeTo, self).__init__()
|
||
|
translate = gettext.translation('base', localedir=CONF['locale']['dir'], languages=[lang])
|
||
|
translate.install()
|
||
|
_ = translate.gettext
|
||
|
|
||
|
#generic strings
|
||
|
str_00 = _("User")
|
||
|
str_01 = _("Username")
|
||
|
str_02 = _("Firstname")
|
||
|
str_03 = _("Surname")
|
||
|
str_04 = _("Password")
|
||
|
str_05 = _("Old password")
|
||
|
str_06 = _("New password")
|
||
|
str_07 = _("Confirm password")
|
||
|
str_08 = _("Email")
|
||
|
str_09 = _("edit")
|
||
|
str_10 = _("Login")
|
||
|
str_11 = _("Logout")
|
||
|
str_12 = _("Delete")
|
||
|
str_13 = _("Sign Up")
|
||
|
str_14 = _("Back")
|
||
|
str_15 = _("Update")
|
||
|
str_16 = _("Or Sign In")
|
||
|
str_17 = _("Or Sign Up")
|
||
|
str_18 = _("Invite code")
|
||
|
str_19 = _("Edit your fullname")
|
||
|
str_20 = _("Edit your email")
|
||
|
str_21 = _("Change your password")
|
||
|
str_22 = _("Delete your account")
|
||
|
str_23 = _("Welcome")
|
||
|
|
||
|
#messages
|
||
|
msg_00 = _("The session was closed.")
|
||
|
msg_01 = _("Username must be at least 3 characters long!")
|
||
|
msg_02 = _("Please enter your password!")
|
||
|
msg_03 = _("Welcome")
|
||
|
msg_04 = _("The code is invalid or has expired.")
|
||
|
msg_05 = _("A bit short, don't you think?!")
|
||
|
msg_06 = _("Not allowed characters for the username field.")
|
||
|
msg_07 = _("Passwords do not match!")
|
||
|
msg_08 = _("Password must be at least 8 characters long!")
|
||
|
msg_09 = _("Congratulations, your account has been created!")
|
||
|
msg_10 = _("Your firstname is a bit short, don't you think?")
|
||
|
msg_11 = _("Your surname is a bit short, don't you think?")
|
||
|
msg_12 = _("Your first and last name have been successfully updated.")
|
||
|
msg_13 = _("Invalid email address. Please try again.")
|
||
|
msg_14 = _("Your email has been successfully updated.")
|
||
|
msg_15 = _("The password entered is the same as the current password.")
|
||
|
msg_16 = _("Password has been changed!")
|
||
|
msg_17 = _("Please, type your username for account deletion.")
|
||
|
msg_18 = _("Account successfully deleted!")
|
||
|
msg_19 = _("Username or password is incorrect!")
|
||
|
msg_20 = _("Unable to connect to the remote server.")
|
||
|
msg_21 = _("Encountered an unexpected error while communicating with the remote server.")
|
||
|
msg_22 = _("User already exists.")
|
||
|
msg_23 = _("Email already exists.")
|
||
|
msg_24 = _("Forgot your password? Please try again.")
|
||
|
msg_25 = _("The session has expired.")
|
||
|
|
||
|
self.str = {'usr':str_00, 'usrn':str_01, 'fn':str_02, 'sn':str_03, 'pwd':str_04, 'old-pwd':str_05, 'new-pwd':str_06, 'conf-pwd':str_07, 'email':str_08, 'edit':str_09, 'login':str_10, 'log-out':str_11, 'del':str_12, 'sign-up':str_13, 'back':str_14, 'update':str_15, 'or-sign-in':str_16, 'or-sign-up':str_17, 'inv-code':str_18, 'edit-fn':str_19, 'edit-email':str_20, 'ch-pwd':str_21, 'del-account':str_22, 'welcome':str_23}
|
||
|
self.msg = (msg_00, msg_01, msg_02, msg_03, msg_04, msg_05, msg_06, msg_07, msg_08, msg_09, msg_10, msg_11, msg_12, msg_13, msg_14, msg_15, msg_16, msg_17, msg_18, msg_19, msg_20, msg_21, msg_22, msg_23, msg_24, msg_25)
|
||
|
|
||
|
if environ.get('DEBUG'):
|
||
|
bottle.debug(True)
|
||
|
|
||
|
# Set up logging.
|
||
|
logging.basicConfig(format=LOG_FORMAT)
|
||
|
LOG.setLevel(logging.INFO)
|
||
|
LOG.info("Starting ldap-python-webui %s" % VERSION)
|
||
|
|
||
|
session_opts = {
|
||
|
'session.type': CONF['session']['type'],
|
||
|
'session.cookie_expires': CONF['session']['expire'],
|
||
|
'session.data_dir': CONF['session']['data_dir'],
|
||
|
'session.auto': CONF['session']['auto']
|
||
|
}
|
||
|
|
||
|
class SuperUsers(object):
|
||
|
"""docstring for Session"""
|
||
|
def __init__(self, conf):
|
||
|
super(SuperUsers, self).__init__()
|
||
|
self.domain=conf['base'][conf['base'].find(","):]
|
||
|
self.admin_dn="cn=admin"+self.domain
|
||
|
self.admin_pwd=environ['LDAP_ADMIN_PASSWORD']
|
||
|
self.readonly_dn="cn=readonly"+self.domain
|
||
|
self.readonly_pwd=environ['LDAP_READONLY_PASSWORD']
|
||
|
|
||
|
superUser = SuperUsers(CONF['ldap:0'])
|
||
|
|
||
|
app = beaker.middleware.SessionMiddleware(bottle.app(), session_opts)
|
||
|
|
||
|
bottle.TEMPLATE_PATH = [BASE_DIR]
|
||
|
|
||
|
# Set default attributes to pass into templates.
|
||
|
#SimpleTemplate.defaults = dict(CONF['html'])
|
||
|
SimpleTemplate.defaults['url'] = bottle.url
|
||
|
|
||
|
|
||
|
# Run bottle internal server when invoked directly (mainly for development).
|
||
|
if __name__ == '__main__':
|
||
|
bottle.run(app, **CONF['server'])
|
||
|
# Run bottle in application mode (in production under uWSGI server).
|
||
|
else:
|
||
|
#application = bottle.default_app()
|
||
|
application = app
|