diff --git a/app.py b/app.py
index 94e440e..9ce6c54 100644
--- a/app.py
+++ b/app.py
@@ -54,7 +54,6 @@ def get_index():
@get('/user')
def get_index():
- print('SESSION:',newSession().get())
try:
print('SESSION:',newSession().get())
return user_tpl(data=newSession().get(), str=i18n.str)
@@ -162,8 +161,11 @@ def post_user():
LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
return error(str(e))
+ #data=[form('username'),form('password'),newSession().get()['id'],newSession().get()['authCode']]
+
#print('N:',newSession().get()['id'])
key = cryptocode.encrypt(form('password'), newSession().get()['id'])
+ #key = cryptocode.encrypt(data, newSession().get()['id'])
print(key)
key = base64.urlsafe_b64encode(str.encode(key))
print(key)
@@ -171,7 +173,7 @@ def post_user():
try:
if(check_2fa_step1(form('username'))):
print('kk')
- logout(form('username'))
+ #logout(form('username'))
return index_tpl(two_factor_authentication=True, key=key, str=i18n.str)
except Error as e:
LOG.warning("Erabiltzailea ez da aurkitu???")
@@ -216,7 +218,7 @@ def post_user_step2(key):
print("sid:",newSession().get()['id'])
print('pwd:',password)
- #logout(newSession().get()['username'])
+ logout(newSession().get()['username'])
def error(msg):
return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
diff --git a/ç b/ç
deleted file mode 100644
index 7e22485..0000000
--- a/ç
+++ /dev/null
@@ -1,1314 +0,0 @@
-#!/usr/bin/env python3
-
-'''
-ldap-python-webui :: LDAP kudeaketarako Web Interfazea - Web UI for LDAP management.
-Copyright (C) 2022 Aitzol Berasategi - Wproject
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see .
-'''
-
-import bottle
-from bottle import get, post, static_file, request, route, template
-from bottle import SimpleTemplate
-from bottle.ext import beaker
-from configparser import ConfigParser
-from ldap3 import Server, Connection, ALL
-from ldap3 import SIMPLE, SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE, ALL_ATTRIBUTES
-from ldap3.core.exceptions import LDAPBindError, LDAPConstraintViolationResult, \
- LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError, \
- LDAPSocketOpenError, LDAPExceptionError, LDAPAttributeOrValueExistsResult, \
- LDAPNoSuchAttributeResult
-import logging
-from os import getenv, environ, path, remove
-from libs import flist, slist
-from libs.localization import *
-from libs.helper import tools
-import random
-from user_agents import parse as ua_parse
-from datetime import datetime
-import threading
-import cryptocode
-
-BASE_DIR = path.dirname(__file__)
-LOG = logging.getLogger(__name__)
-LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s'
-VERSION = '0.0.2'
-
-@get('/')
-def get_index():
- try:
- '''
- while(newSession().get()['secureAuth'] and not newSession().secure_logged_in):
- logout(newSession().get()['username'])
- '''
- return user_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/user')
-def get_index():
- try:
- print(newSession().get())
- print(newSession().secure_logged_in)
- print(newSession().id);
- '''
- while(newSession().get()['secureAuth'] and not newSession().secure_logged_in):
- logout(newSession().get()['username'])
- '''
- return user_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/signup')
-def get_index():
- newSession()
- if allowed:
- return signup_tpl(str=i18n.str)
- else:
- return index_tpl(alerts=[('error', i18n.msg[28], 'upDown')], str=i18n.str)
-
-@get('/change_pwd')
-def get_index():
- try:
- return change_pwd_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/edit_fullname')
-def get_index():
- try:
- return edit_fullname_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/edit_email')
-def get_index():
- try:
- return edit_email_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/logs')
-def get_index():
- try:
- return logs_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/delete')
-def get_index():
- try:
- return delete_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/logout')
-def get_index():
-
- def error(msg):
- return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- try:
- username = newSession().get()['username']
- if(username is not None):
- logout(username)
- except Error as e:
- LOG.warning("Unsuccessful attempt to log out: %s" % e)
- return error(str(e))
-
- return index_tpl(alerts=[('success', i18n.msg[0], 'fadeOut')], str=i18n.str)
-
-@get('/_2fa')
-def get_index():
- #newSession().get()
- try:
- reload(newSession().get()['username'], None, None)
- #add_auth_attribute_step1(newSession().get()['username'], None, None)
- return _2fa_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@get('/enable_2fa')
-@get('/disable_2fa')
-def get_index():
- try:
- return user_tpl(data=newSession().get(), str=i18n.str)
- except Exception as e:
- return index_tpl(str=i18n.str)
-
-@post('/auth')
-def post_user():
- form = request.forms.getunicode
-
- def error(msg):
- return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- if len(form('username')) < 3:
- return error(i18n.msg[2])
- elif not tools.input_validation(form('username')):
- return error(i18n.msg[3])
-
- if not tools.pwd_validation(form('password')):
- return error(i18n.msg[21])
-
- username = form('username')
- password = form('password')
-
- '''
- try:
- if(check_2fa_step1(form('username'))):
- print('kk')
- #return index_tpl(two_factor_authentication=True, u=(form('username')), p=(form('password')), str=i18n.str)
- except Error as e:
- LOG.warning("Erabiltzailea ez da aurkitu???")
- '''
-
- try:
- #th = threading.Thread(target=login, args=(form('username'), form('password'),check_2fa_step1(form('username'))))
- #th.start()
- login(form('username'), form('password'))
- except Error as e:
- LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
- return error(str(e))
- print(newSession().id)
- key = cryptocode.encrypt(form('password'), newSession().id)
- try:
- if(check_2fa_step1(form('username'))):
- print('kk')
- return index_tpl(two_factor_authentication=True, key=key, str=i18n.str)
- except Error as e:
- LOG.warning("Erabiltzailea ez da aurkitu???")
- #th.start()
- '''
- if(not newSession().get()['secureAuth']):
- return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
- elif(newSession().get()['secureAuth']):
- return index_tpl(two_factor_authentication=True, str=i18n.str)
- '''
- return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
-
-@post('/user')
-def post_user():
- form = request.forms.getunicode
-
- def error(msg):
- return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- if len(form('username')) < 3:
- return error(i18n.msg[2])
- elif not tools.input_validation(form('username')):
- return error(i18n.msg[3])
-
- if not tools.pwd_validation(form('password')):
- return error(i18n.msg[21])
-
- try:
- login(form('username'), form('password'))
- except Error as e:
- LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
- return error(str(e))
- '''
- if(not newSession().get()['secureAuth']):
- return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
- elif(newSession().get()['secureAuth']):
- return index_tpl(two_factor_authentication=True, str=i18n.str)
- '''
- return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str)
-@post('/user_step2/')
-def post_user_step2(key):
- form = request.forms.getunicode
-
- secret = newSession().get()['authCode']
- username = newSession().get()['username']
- password = cryptocode.decrypt(key, newSession().id)
-
- logout(newSession().get()['username'])
-
- def error(msg):
- return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- #if not tools._2fa_validation(form('code'), newSession().get()['authCode']):
- if not tools._2fa_validation(form('code'), secret):
- #logout(newSession().get()['username'])
- #logout(username)
- return error('Kode okerra. Saio hasierak huts egin du.')
- else:
-
- try:
- login(username, password)
- except Error as e:
- LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e))
- return error(str(e))
-
- newSession().secure_logged_in = True
- return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], newSession().get()['username']), 'fadeOut' )], data=newSession().get(), str=i18n.str)
-
-@post('/signup')
-def post_signup():
-
- #ensure that i18n exists
- if 'i18n' not in globals():
- newSession()
-
- form = request.forms.getunicode
- isFake = False
-
- db = 'data/invite-codes.db'
-
- def auto_complete(arg):
- if arg == 'firstname':
- result = random.choice(flist.firstname)
- elif arg == 'surname':
- result = random.choice(slist.surname)
- return(result.capitalize())
-
- def error(msg):
- return signup_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- if not tools.code_is_valid(form('invite_code'), db):
- return(error(i18n.msg[6]))
-
- if len(form('username')) < 3:
- return error(i18n.msg[2])
-
- username = form('username').lower()
- if not tools.input_validation(username):
- return error(i18n.msg[3])
-
- if len(form('firstname')) == 0:
- firstname = auto_complete('firstname')
- isFake = True
- else:
- firstname = form('firstname').title()
-
- if len(form('surname')) == 0:
- surname = auto_complete('surname')
- isFake = True
- else:
- surname = form('surname').title()
-
- email = form('email').lower()
- if not tools.email_validation(email):
- return error(i18n.msg[15])
-
- if not tools.pwd_validation(form('password')):
- return error(i18n.msg[8]) #mezua ALDATU egin behar da
- elif form('password') != form('confirm-password'):
- return error(i18n.msg[7])
-
- try:
- account_request(username, firstname, surname, form('password'), email, isFake, get_dev())
- except Error as e:
- LOG.warning("Unsuccessful attempt to create an account for %s: %s" % (form('username'), e))
- return error(str(e))
-
- try:
- tools.mark_code_as_used(form('invite_code'), db)
- except Error as e:
- LOG.warning("There was a problem verifying the invitation code, please try again later.", e)
- return error(str(e))
-
- LOG.info("New account successfully created for %s" % form('username'))
-
- return index_tpl(alerts=[('success', i18n.msg[9], 'fadeOut')], str=i18n.str)
-
-@post('/edit_fullname')
-def post_edit_fullname():
- form = request.forms.getunicode
-
- try:
- username = newSession().get()['username']
- old_firstname = newSession().get()['firstname']
- old_surname = newSession().get()['surname']
- except Error as e:
- return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
-
- def error(msg):
- return edit_fullname_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
-
- if len(form('firstname')) < 3:
- return error(i18n.msg[11])
- elif not tools.input_validation(form('firstname'), True):
- return error(i18n.msg[4])
-
- if len(form('surname')) < 3:
- return error(i18n.msg[12])
- elif not tools.input_validation(form('surname'), True):
- return error(i18n.msg[5])
-
- try:
- edit_fullname(username, old_firstname, old_surname, form('firstname').title(), form('surname').title())
- except Error as e:
- LOG.warning("Unsuccessful attempt to edit fullname for %s: %s" % (username, e))
- return error(str(e))
-
- return user_tpl(alerts=[('success', i18n.msg[13], 'fadeOut' )], data=newSession().get(), str=i18n.str)
-
-@post('/edit_email')
-def post_edit_email():
- form = request.forms.getunicode
-
- try:
- username = newSession().get()['username']
- old_email = newSession().get()['mail']
- except Error as e:
- return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
-
- def error(msg):
- return edit_email_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
-
- if not tools.email_validation(form('email')):
- return(error(i18n.msg[14]))
-
- try:
- edit_email(username, old_email, form('email').lower())
- except Error as e:
- LOG.warning("Unsuccessful attempt to change email address for %s: %s" % (username, e))
- return error(str(e))
-
- return user_tpl(alerts=[('success', i18n.msg[16], 'fadeOut' )], data=newSession().get(), str=i18n.str)
-
-@post('/enable_2fa')
-def post_enable_2fa():
-
- def error(msg):
- return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
-
- try:
- if(not newSession().get()['secureAuth']):
- try:
- username=newSession().get()['username']
- add_auth_attribute_step1(username, tools.gen_secret(), action='enable')
- except Error as e:
- #add_auth_attribute_step1(newSession().get()['username'], None, None)
- reload(newSession().get()['username'], None, None)
- LOG.warning(e)
- return error('2 urratseko autentifikazioa birgaitua izan da.')
- except Error as e:
- LOG.warning(e)
- return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str)
-
- return _2fa_tpl(alerts=[('success', 'Bikain, 2 urratseko autentifikazioa gaitu da.', 'fadeOut')], data=newSession().get(), str=i18n.str)
-
-@post('/disable_2fa')
-def post_disable_2fa():
-
- def error(msg):
- return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str)
-
- try:
- if(newSession().get()['secureAuth']):
- try:
- username=newSession().get()['username']
- add_auth_attribute_step1(username, None, action='disable')
- except Error as e:
- #add_auth_attribute_step1(newSession().get()['username'], None, None)
- reload(newSession().get()['username'], None, None)
- LOG.warning(e)
- return error(str(e))
- except Error as e:
- LOG.warning(e)
- return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str)
-
- return _2fa_tpl(alerts=[('error', '2 urratseko autentifikazioa desgaitua izan da.', 'fadeOut')], data=newSession().get(), str=i18n.str)
-
-@post('/change_pwd')
-def post_change_pwd():
- form = request.forms.getunicode
-
- try:
- username=newSession().get()['username']
- except Error as e:
- return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str)
-
- def error(msg):
- return change_pwd_tpl(username=username, alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- if not tools.pwd_validation(form('old-password')):
- return error(i18n.msg[26])
- elif (not tools.pwd_validation(form('new-password')) or not tools.pwd_validation(form('confirm-password'))):
- return error(i18n.msg[8]) #mezua aldatu egin behar da
- elif form('new-password') != form('confirm-password'):
- return error(i18n.msg[7])
- elif form('old-password') == form('confirm-password'):
- return error(i18n.msg[17])
-
- try:
- change_passwords(username, form('old-password'), form('new-password'))
- logout(username)
- except Error as e:
- LOG.warning("Unsuccessful attempt to change password for %s: %s" % (username, e))
- return error(str(e))
-
- LOG.info("Password successfully changed for: %s" % username)
-
- return index_tpl(alerts=[('success', i18n.msg[18], 'fadeOut')], username=username, str=i18n.str)
-
-@post('/delete')
-def post_delete():
- form = request.forms.getunicode
-
- def error(msg):
- return delete_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str)
-
- try:
- username = newSession().get()['username']
- if(tools.input_validation(form('username')) and form('username').lower() == username):
- del_user(username)
- else:
- return(error(i18n.msg[19]))
-
- except Error as e:
- LOG.warning("Unsuccessful attempt to delete the account: %s" % e)
- return error(str(e))
-
- LOG.info("Account successfully deleted")
-
- return index_tpl(alerts=[('success', i18n.msg[20], 'fadeOut')], str=i18n.str)
-
-@route('/static/', name='static')
-def serve_static(filename):
- return static_file(filename, root=path.join(BASE_DIR, 'static'))
-
-@get("/static/fonts/")
-def font(filepath):
- return static_file(filepath, root="static/fonts")
-
-@get("/static/tmp/")
-def font(filepath):
- return static_file(filepath, root="static/tmp")
-
-def index_tpl(**kwargs):
- return template('index', **kwargs)
-
-def user_tpl(**kwargs):
- return template('user', **kwargs)
-
-def signup_tpl(**kwargs):
- return template('signup', **kwargs)
-
-def change_pwd_tpl(**kwargs):
- return template('change_pwd', **kwargs)
-
-def edit_email_tpl(**kwargs):
- return template('edit_email', **kwargs)
-
-def edit_fullname_tpl(**kwargs):
- return template('edit_fullname', **kwargs)
-
-def delete_tpl(**kwargs):
- return template('delete', **kwargs)
-
-def logs_tpl(**kwargs):
- return template('logs', **kwargs)
-
-def _2fa_tpl(**kwargs):
- return template('_2fa', **kwargs)
-
-def connect_ldap(conf, **kwargs):
- server = Server(host=conf['host'],
- port=conf.getint('port', None),
- use_ssl=conf.getboolean('use_ssl', False),
- connect_timeout=5)
-
- return Connection(server, raise_exceptions=True, **kwargs)
-
-#LOGIN
-def login(username, password):
-
- n = N
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("%s is trying to logging in %s" % (username, key))
- n -= 1
- try:
- login_user(CONF[key], username, password)
- except Error as e:
- if n >=1:
- e = []
- continue
- else:
- raise e
- break
-
-def login_user(conf, *args):
- try:
- login_user_ldap(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[21])
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def login_user_ldap(conf, username, password):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- #with connect_ldap(conf) as c:
- with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
- user_dn = find_user_dn(conf, c, username)
- # Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
- with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=password) as c:
- c.bind()
- if is_trusted_device(conf, user_dn):
- newSession().set(get_user_data(user_dn, c))
- #new_session(user_dn, c, conf, lambda: check_2fa_step1())
- #update timestamp + ip address
- update_login_info(conf, user_dn)
- LOG.debug("%s logged in to %s" % (username, conf['base']))
- #check if exists 2fa qr image
- if(newSession().get()['secureAuth']):
- tools.gen_qr(newSession().get()['authCode'])
- #if(newSession().get()['secureAuth'] and not newSession().secure_logged_in):
- #logout(newSession().get()['username'])
-'''
-def new_session(user_dn, c, conf, two_factor_auth):
- while(two_factor_auth):
- newSession().set(get_user_data(user_dn, c))
- update_login_info(conf, user_dn)
- if(newSession().get()['secureAuth']):
- tools.gen_qr(newSession().get()['authCode'])
- LOG.debug("%s logged in to %s" % (newSession().get()['username'], conf['base']))
-'''
-#LOGOUT
-def logout(username):
- n = N
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Logging out %s from %s" % (username, key))
- n -= 1
- try:
- logout_user(CONF[key], username)
- except Error as e:
- if n >=1:
- e = []
- continue
- else:
- raise e
-
- break
-
-def logout_user(conf, *args):
- try:
- logout_user_ldap(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[21])
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def logout_user_ldap(conf, username):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- #with connect_ldap(conf) as c:
- with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
- user_dn = find_user_dn(conf, c, username)
- c.unbind()
- #newSession().close()
- newSession().delete()
- LOG.info("%s LOGED OUT" % (username))
-
-#SIGN UP
-def account_request(username, firstname, surname, password, email, isFake, device):
- created = []
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Creating account for %s on %s server" % (username, key))
- try:
- new_user_account(CONF[key], username, firstname, surname, password, email, isFake, device)
- created.append(key)
- except Error as e:
- for key in reversed(created):
- LOG.info("Reverting account creation in %s for %s" % (key, username))
- try:
- #Akatsen bat gertatzen bada LDAP instantzia guztietan kontua ezabatu
- del_account(CONF[key], username)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def new_user_account(conf, *args):
- try:
- register(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[21])
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[22])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def register(conf, username, firstname, surname, password, email, isFake, device):
-
- def to_ascii(str):
- ascii_str=""
- for c in str:
- if 0 <= ord(c) <= 127:
- ascii_str=ascii_str+c
- else:
- ascii_str=ascii_str+"X"
- return(ascii_str)
-
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
-
- try:
- if (find_user_dn(conf,c,username) is not None):
- raise Error(i18n.msg[24])
-
- if (find_email(conf,c,email)):
- raise Error(i18n.msg[25])
-
- except Error as e:
- raise e
-
- else:
- #create new account
- uidNumber = find_uid_number(conf,c)+1
- directory = 'home/user/'+to_ascii(username)
- OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
- ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z'
- attributes = {'gidNumber': '501', 'uidNumber': uidNumber, 'homeDirectory': directory, 'givenName':
- firstname, 'sn': surname, 'uid' : username, 'mail': email, 'active': False, 'fakeCn': isFake,
- 'devices':device, 'ip':request.environ.get('HTTP_X_REAL_IP', request.remote_addr), 'lastLogin': ts,
- 'secureAuth': False}
- new_user_dn = "cn="+firstname+" "+surname+" - "+username+",cn=users,"+conf['base']
- c.add(dn=new_user_dn,object_class=OBJECT_CLASS, attributes=attributes)
- #create/change user password
- c.extend.standard.modify_password(new_user_dn, '', password)
- LOG.info("%s has registered on %s" % (username, conf))
-
-#EDIT FULLNAME
-def edit_fullname(username, old_firstname, old_surname, firstname, surname,):
- changed = []
-
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Changing fullname in %s for %s" % (key, username))
- try:
- new_fullname(CONF[key], username, firstname, surname)
- changed.append(key)
- LOG.debug("%s changed fullname on %s" % (username, key))
- except Error as e:
- for key in reversed(changed):
- LOG.info("Reverting fullname change in %s for %s" % (key, username))
- try:
- new_fullname(CONF[key], username, old_firstname, old_surname)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def new_fullname(conf, *args):
- try:
- update_fullname(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[26])
-
- except LDAPConstraintViolationResult as e:
- # Extract useful part of the error message (for Samba 4 / AD).
- msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
- raise Error(msg)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def update_fullname(conf, username, firstname, surname):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- #with connect_ldap(conf) as c:
- user_dn = find_user_dn(conf, c, username)
- c.modify(user_dn, {'givenName': [( MODIFY_REPLACE, firstname )], 'sn': [( MODIFY_REPLACE, surname )]})
-
- new_cn = "cn="+firstname+" "+ surname+" - "+ username
- c.modify_dn(user_dn, new_cn)
- new_user_dn = new_cn+",cn=users,"+conf['base']
-
- base = ",cn=users," + conf['base']
- fakeFullName = user_dn[3:-len(base)].split(" ")
-
- if(user_dn == new_user_dn):
- raise Error(i18n.msg[10])
-
- c.modify(new_user_dn, {'fakeCn': [(MODIFY_REPLACE, 'false' )]})
- newSession().set(get_user_data(new_user_dn, c))
-
-#EDIT EMAIL
-def edit_email(username, old_email, new_email):
- changed = []
-
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Changing email in %s for %s" % (key, username))
- try:
- new_email_address(CONF[key], username, old_email, new_email)
- changed.append(key)
- LOG.debug("%s changed email address on %s" % (username, key))
- except Error as e:
- for key in reversed(changed):
- LOG.info("Reverting email change in %s for %s" % (key, username))
- try:
- new_email_address(CONF[key], username, new_email, old_email)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def new_email_address(conf, *args):
- try:
- update_email_address(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[26])
-
- except LDAPConstraintViolationResult as e:
- # Extract useful part of the error message (for Samba 4 / AD).
- msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
- raise Error(msg)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def update_email_address(conf, username, old_email, new_email):
- if(old_email == new_email):
- raise Error(i18n.msg[15])
-
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- user_dn = find_user_dn(conf, c, username)
- new_email_addresses = get_user_email_array(user_dn, c, old_email, new_email)
- c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]})
- newSession().set(get_user_data(user_dn, c))
-
-# ADD AUTHCODE ATTRIBUTE - 2FA
-def add_auth_attribute_step1(username, code, action):
- changed = []
-
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Adding secureAuth attribute %s to %s" % (key, username))
- try:
- add_auth_attribute_step2(CONF[key], username, code, action)
- changed.append(key)
- LOG.debug("%s changed email address on %s" % (username, key))
- except Error as e:
- for key in reversed(changed):
- LOG.info("Reverting email change in %s for %s" % (key, username))
- try:
- new_email_address(CONF[key], username, new_email, old_email)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def add_auth_attribute_step2(conf, *args):
- try:
- add_auth_attribute_step3(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[26])
-
- except LDAPConstraintViolationResult as e:
- # Extract useful part of the error message (for Samba 4 / AD).
- msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
- raise Error(msg)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPNoSuchAttributeResult as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error('Dagoeneko desgaiturik zeneukan 2 urratseko autentifikazioa.')
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def add_auth_attribute_step3(conf, username, code, action):
-
- #set current LDAP
- superUser = SuperUsers(conf)
-
- print(action)
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- user_dn = find_user_dn(conf, c, username)
- if(action == 'enable'):
- c.modify(user_dn,{'authCode': [(MODIFY_ADD, [code])]})
- c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [True]]})
- elif(action == 'disable'):
- c.modify(user_dn,{'authCode': [(MODIFY_DELETE, [])]})
- c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [False]]})
- #remove file
- try:
- remove('static/tmp/'+newSession().get()['authCode']+'.png')
- except OSError as e:
- LOG.warning(str(e))
- #raise Error(e)
- pass
-
- newSession().set(get_user_data(user_dn, c))
-
-reload=add_auth_attribute_step1
-
-# CHECK SECUREAUTH
-def check_2fa_step1(username):
- changed = []
-
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Changing email in %s for %s" % (key, username))
- try:
- return check_2fa_step2(CONF[key], username)
- changed.append(key)
- LOG.debug("%s changed email address on %s" % (username, key))
- except Error as e:
- for key in reversed(changed):
- LOG.info("Reverting email change in %s for %s" % (key, username))
- try:
- return check_2fa_step2(CONF[key], username)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def check_2fa_step2(conf, *args):
- try:
- return check_2fa_step3(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[26])
-
- except LDAPConstraintViolationResult as e:
- # Extract useful part of the error message (for Samba 4 / AD).
- msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
- raise Error(msg)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def check_2fa_step3(conf, username):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- user_dn = find_user_dn(conf, c, username)
- secure_auth_status = check_secure_auth(user_dn, c)
- print(secure_auth_status)
- return(secure_auth_status)
- #c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]})
- #newSession().set(get_user_data(user_dn, c))
-
-#CHANGE PASSWORD
-def change_passwords(username, old_pass, new_pass):
- changed = []
-
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
-
- LOG.debug("Changing password in %s for %s" % (key, username))
- try:
- change_password(CONF[key], username, old_pass, new_pass)
- changed.append(key)
- LOG.debug("%s changed pwd on %s" % (username, key))
- except Error as e:
- for key in reversed(changed):
- LOG.info("Reverting password change in %s for %s" % (key, username))
- try:
- change_password(CONF[key], username, new_pass, old_pass)
- except Error as e2:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e2))
- raise e
-
-def change_password(conf, *args):
- try:
- if conf.get('type') == 'ad':
- change_password_ad(conf, *args)
- else:
- change_password_ldap(conf, *args)
-
- except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
- raise Error(i18n.msg[26])
-
- except LDAPConstraintViolationResult as e:
- # Extract useful part of the error message (for Samba 4 / AD).
- msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
- raise Error(msg)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-
-def change_password_ldap(conf, username, old_pass, new_pass):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c:
- #with connect_ldap(conf) as c:
- user_dn = find_user_dn(conf, c, username)
-
- # Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
- with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=old_pass) as c:
- c.bind()
- c.extend.standard.modify_password(user_dn, old_pass, new_pass)
-
-def change_password_ad(conf, username, old_pass, new_pass):
- user = username + '@' + conf['ad_domain']
-
- with connect_ldap(conf, authentication=SIMPLE, user=user, password=old_pass) as c:
- c.bind()
- user_dn = find_user_dn(conf, c, username)
- c.extend.microsoft.modify_password(user_dn, new_pass, old_pass)
-
-#DELETE ACCOUNT
-def del_user(username):
- n = N
- for key in (key for key in CONF.sections()
- if key == 'ldap' or key.startswith('ldap:')):
- LOG.debug("Deleting account for %s from %s" % (username, key))
- n -= 1
- try:
- del_account(CONF[key], username)
- LOG.debug("Account for %s deleted on -> %s" % (username, CONF[key]))
- if(n == 0 and newSession().get()['username'] is not None):
- newSession().delete()
-
- except Error as e:
- raise e
-
-def del_account(conf, *args):
- try:
- delete(conf, *args)
-
- except LDAPSocketOpenError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
- except LDAPExceptionError as e:
- LOG.error('{}: {!s}'.format(e.__class__.__name__, e))
- raise Error(i18n.msg[23])
-
-def delete(conf, username):
- #set current LDAP
- superUser = SuperUsers(conf)
-
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- try:
- user_dn = find_user_dn(conf, c, username)
- c.delete(user_dn)
-
- except Error as e:
- raise e
-
-#AUXILIARY FUNCTIONS
-#find user
-def find_user_dn(conf, conn, uid):
- search_filter = conf['search_filter'].replace('{uid}', uid)
- conn.search(conf['base'], "(%s)" % search_filter, SUBTREE)
-
- return conn.response[0]['dn'] if conn.response else None
-
-#find email
-def find_email(conf, conn, email):
- search_filter = '(uid=*)'
- if conn.search(conf['base'], search_filter, attributes=['mail']):
- for i in conn.response:
- for j in i['attributes']['mail']:
- if(j == email):
- return True
-
- return False
-
-#find device
-def find_device(user_dn, conn, device):
- search_filter = '(objectClass=*)'
- if conn.search(user_dn, search_filter, attributes=['devices']):
- for i in conn.response:
- for j in i['attributes']['devices']:
- if(j == device):
- return True
-
- return False
-
-#find highest uidNumber
-def find_uid_number(conf, conn):
- search_filter = '(uid=*)'
- if conn.search(conf['base'], search_filter, attributes=['uidNumber']):
-
- uidNumbersList=[]
- for i in conn.response:
- uidNumbersList.append(i['attributes']['uidNumber'])
-
- uidNumbersList = list(filter(lambda i: type(i) is int, uidNumbersList))
- return max(uidNumbersList)
-
- else:
- return(999)
-
-def get_user_email_array(user_dn, conn, old_email, new_email):
- search_filter = '(objectClass=*)'
- conn.search(user_dn, search_filter, attributes=['mail'])
- emails = conn.entries[0].mail.values
- for i in range(len(emails)):
- if(emails[i] == old_email):
- emails[i] = new_email
- return(emails)
-
-def check_secure_auth(user_dn, conn):
- search_filter = '(objectClass=*)'
- conn.search(user_dn, search_filter, attributes=['secureAuth'])
- status = conn.entries[0].secureAuth
- return(status)
-
-def get_user_data(user_dn, conn):
- search_filter = '(objectClass=*)'
- conn.search(user_dn, search_filter,
- attributes=['active','fakeCn','givenName','sn','uid','mail','devices','ip','lastLogin','secureAuth',
- 'authCode'])
- data = []
- data.append(conn.entries[0].active.values[0])
- data.append(conn.entries[0].fakeCn.values[0])
- data.append(conn.entries[0].givenName.values[0])
- data.append(conn.entries[0].sn.values[0])
- data.append(conn.entries[0].uid.values[0])
- data.append(conn.entries[0].mail.values[0])
- data.append(conn.entries[0].devices.values)
- data.append(conn.entries[0].ip.values[0])
- #ts = conn.entries[0].lastLogin.values[0]
- #ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S%z')
- #ts = datetime.strftime(t, '%Y-%m-%d %H:%M:%S')
- data.append(str(conn.entries[0].lastLogin.values[0])[:-6])
- data.append(conn.entries[0].secureAuth.values[0])
- if(conn.entries[0].authCode):
- data.append(conn.entries[0].authCode.values[0])
-
- return(data)
-
-def read_config():
- config = ConfigParser()
- config.read([path.join(BASE_DIR, 'settings.ini'), getenv('CONF_FILE', '')])
- return config
-
-CONF = read_config()
-
-def ldaps_count():
- keys = []
- for i, key in enumerate(CONF.sections()):
- if key == 'ldap' or key.startswith('ldap:'):
- keys.append(key)
- #n=len(keys)
- return(len(keys))
-
-N = ldaps_count()
-
-def reg():
- if CONF['general']['allow_registration'] == 'True':
- return True
-
-allowed = reg()
-
-def get_dev():
- ua_string = bottle.request.environ.get('HTTP_USER_AGENT')
- user_agent = ua_parse(ua_string)
- return str(user_agent)
-
-def is_trusted_device(conf, user_dn):
- superUser = SuperUsers(conf)
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- d = get_dev()
- try:
- if not find_device(user_dn, c, d):
- OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
- c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] })
- '''
- if find_device(user_dn, c, 'unknown'):
- OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
- c.modify(user_dn, {'devices': [( MODIFY_REPLACE, d )] })
- else:
- OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement']
- c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] })
- '''
- c.unbind()
- return True
- except Exception as e:
- print(e)
- return True
-
-def update_login_info(conf, user_dn):
- superUser = SuperUsers(conf)
- with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c:
- ip = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
- ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z'
- c.modify(user_dn, {'ip': [( MODIFY_REPLACE, str(ip) )], 'lastLogin': [( MODIFY_REPLACE, ts )] })
- c.unbind()
-
-class Error(Exception):
- pass
-
-#SESSIONS MANAGEMENT
-def newSession():
-
- class Session(object):
- """docstring for Session"""
- def __init__(self):
- super(Session, self).__init__()
- self.data = bottle.request.environ.get('beaker.session')
- self.id = tools.session_id()
- self.secure_logged_in = False
- self.lang = self.get_lang()
- #localization
- self.lang = self.get_lang()
- global i18n
- i18n = LocalizeTo(self.lang, CONF)
-
- def get_lang(self):
- if 'HTTP_ACCEPT_LANGUAGE' in bottle.request.environ:
- lang = bottle.request.get('HTTP_ACCEPT_LANGUAGE')
- return str(lang[:2])
- else:
- return CONF['locale']['lang']
-
- def get(self):
- if 'username' in self.data:
- return(self.data)
- else:
- raise Error(i18n.msg[27])
-
- def set(self, data):
- self.active = data[0]
- self.fakeCn = data[1]
- self.firstname = data[2]
- self.surname = data[3]
- self.username = data[4]
- self.mail = data[5]
- self.devices = data[6]
- self.ip = data[7]
- self.lastLogin = data[8]
- self.secureAuth = data[9]
- try:
- self.authCode = data[10]
- except:
- self.authCode = None
-
- self.data['active'] = self.active
- self.data['fakeCn'] = self.fakeCn
- self.data['firstname'] = self.firstname
- self.data['surname'] = self.surname
- self.data['username'] = self.username
- self.data['mail'] = self.mail
- self.data['devices'] = self.devices
- self.data['ip'] = self.ip
- self.data['lastLogin'] = self.lastLogin
- self.data['secureAuth'] = self.secureAuth
- self.data['authCode'] = self.authCode
-
- def close(self):
- self.data.pop('username')
-
- def delete(self):
- self.data.delete()
-
- s=Session()
- return s
-
-if environ.get('DEBUG'):
- bottle.debug(True)
-
-# Set up logging.
-logging.basicConfig(format=LOG_FORMAT)
-LOG.setLevel(logging.INFO)
-LOG.info("Starting ldap-python-webui %s" % VERSION)
-
-session_opts = {
- 'session.type': CONF['session']['type'],
- 'session.cookie_expires': CONF['session']['expire'],
- 'session.data_dir': CONF['session']['data_dir'],
- 'session.auto': CONF['session']['auto']
-}
-
-class SuperUsers(object):
- """docstring for Session"""
- def __init__(self, conf):
- super(SuperUsers, self).__init__()
- self.domain=conf['base'][conf['base'].find(","):]
- self.admin_dn="cn=admin"+self.domain
- self.admin_pwd=environ['LDAP_ADMIN_PASSWORD']
- self.readonly_dn="cn=readonly"+self.domain
- self.readonly_pwd=environ['LDAP_READONLY_PASSWORD']
-
-superUser = SuperUsers(CONF['ldap:0'])
-
-app = beaker.middleware.SessionMiddleware(bottle.app(), session_opts)
-
-bottle.TEMPLATE_PATH = [BASE_DIR]
-
-# Set default attributes to pass into templates.
-#SimpleTemplate.defaults = dict(CONF['html'])
-SimpleTemplate.defaults['url'] = bottle.url
-
-
-# Run bottle internal server when invoked directly (mainly for development).
-if __name__ == '__main__':
- bottle.run(app, **CONF['server'])
-# Run bottle in application mode (in production under uWSGI server).
-else:
- #application = bottle.default_app()
- application = app