From 078af33e77ce08c675b6fba0ce24a36dc2363b54 Mon Sep 17 00:00:00 2001 From: aitzol Date: Mon, 20 Nov 2023 11:45:49 +0100 Subject: [PATCH] 2fa-0.8 --- app.py | 152 ++++++++++++++++++++++++++++++++++++++++++++-- index.tpl | 21 +++++-- index_ezabatu.tpl | 46 ++++++++++++++ libs/helper.py | 5 +- uwsgi.ini.example | 1 + 5 files changed, 211 insertions(+), 14 deletions(-) create mode 100644 index_ezabatu.tpl diff --git a/app.py b/app.py index cad796b..844d178 100644 --- a/app.py +++ b/app.py @@ -46,6 +46,9 @@ VERSION = '0.0.2' @get('/') def get_index(): try: + while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): + logout(newSession().get()['username']) + return user_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @@ -54,6 +57,10 @@ def get_index(): def get_index(): try: print(newSession().get()) + print(newSession().secure_logged_in) + while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): + logout(newSession().get()['username']) + return user_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @@ -135,6 +142,56 @@ def get_index(): except Exception as e: return index_tpl(str=i18n.str) +@post('/auth') +def post_user(): + form = request.forms.getunicode + + def error(msg): + return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) + + if len(form('username')) < 3: + return error(i18n.msg[2]) + elif not tools.input_validation(form('username')): + return error(i18n.msg[3]) + + if not tools.pwd_validation(form('password')): + return error(i18n.msg[21]) + + username = form('username') + password = form('password') + + ''' + try: + if(check_2fa_step1(form('username'))): + print('kk') + #return index_tpl(two_factor_authentication=True, u=(form('username')), p=(form('password')), str=i18n.str) + except Error as e: + LOG.warning("Erabiltzailea ez da aurkitu???") + ''' + + try: + #th = threading.Thread(target=login, args=(form('username'), form('password'))) + #th.start() + login(form('username'), form('password')) + except Error as e: + LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) + return error(str(e)) + + try: + if(check_2fa_step1(form('username'))): + print('kk') + return index_tpl(two_factor_authentication=True, str=i18n.str) + except Error as e: + LOG.warning("Erabiltzailea ez da aurkitu???") + + ''' + if(not newSession().get()['secureAuth']): + return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) + elif(newSession().get()['secureAuth']): + return index_tpl(two_factor_authentication=True, str=i18n.str) + ''' + return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) + @post('/user') def post_user(): form = request.forms.getunicode @@ -155,11 +212,26 @@ def post_user(): except Error as e: LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) return error(str(e)) - - #if 2fa not chekced || (2fa checked & success) + ''' + if(not newSession().get()['secureAuth']): + return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) + elif(newSession().get()['secureAuth']): + return index_tpl(two_factor_authentication=True, str=i18n.str) + ''' return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) - #elif 2fa checked - #return _2fa_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) +@post('/user_step2') +def post_user_step2(): + form = request.forms.getunicode + + def error(msg): + return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) + + if not tools._2fa_validation(form('code'), newSession().get()['authCode']): + logout(newSession().get()['username']) + return error('Kode okerra. Saio hasierak huts egin du.') + + newSession.secure_logged_in = True + return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], newSession().get()['username']), 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/signup') def post_signup(): @@ -477,13 +549,22 @@ def login_user_ldap(conf, username, password): c.bind() if is_trusted_device(conf, user_dn): newSession().set(get_user_data(user_dn, c)) + #new_session(user_dn, c, conf, lambda: check_2fa_step1()) #update timestamp + ip address update_login_info(conf, user_dn) #check if exists 2fa qr image if(newSession().get()['secureAuth']): tools.gen_qr(newSession().get()['authCode']) LOG.debug("%s logged in to %s" % (username, conf['base'])) - +''' +def new_session(user_dn, c, conf, two_factor_auth): + while(two_factor_auth): + newSession().set(get_user_data(user_dn, c)) + update_login_info(conf, user_dn) + if(newSession().get()['secureAuth']): + tools.gen_qr(newSession().get()['authCode']) + LOG.debug("%s logged in to %s" % (newSession().get()['username'], conf['base'])) +''' #LOGOUT def logout(username): n = N @@ -795,6 +876,59 @@ def add_auth_attribute_step3(conf, username, code, action): reload=add_auth_attribute_step1 +# CHECK SECUREAUTH +def check_2fa_step1(username): + changed = [] + + for key in (key for key in CONF.sections() + if key == 'ldap' or key.startswith('ldap:')): + + LOG.debug("Changing email in %s for %s" % (key, username)) + try: + return check_2fa_step2(CONF[key], username) + changed.append(key) + LOG.debug("%s changed email address on %s" % (username, key)) + except Error as e: + for key in reversed(changed): + LOG.info("Reverting email change in %s for %s" % (key, username)) + try: + return check_2fa_step2(CONF[key], username) + except Error as e2: + LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) + raise e + +def check_2fa_step2(conf, *args): + try: + return check_2fa_step3(conf, *args) + + except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): + raise Error(i18n.msg[26]) + + except LDAPConstraintViolationResult as e: + # Extract useful part of the error message (for Samba 4 / AD). + msg = e.message.split('check_password_restrictions: ')[-1].capitalize() + raise Error(msg) + + except LDAPSocketOpenError as e: + LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) + raise Error(i18n.msg[23]) + + except LDAPExceptionError as e: + LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) + raise Error(i18n.msg[23]) + +def check_2fa_step3(conf, username): + #set current LDAP + superUser = SuperUsers(conf) + + with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: + user_dn = find_user_dn(conf, c, username) + secure_auth_status = check_secure_auth(user_dn, c) + print(secure_auth_status) + return(secure_auth_status) + #c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]}) + #newSession().set(get_user_data(user_dn, c)) + #CHANGE PASSWORD def change_passwords(username, old_pass, new_pass): changed = [] @@ -955,6 +1089,12 @@ def get_user_email_array(user_dn, conn, old_email, new_email): emails[i] = new_email return(emails) +def check_secure_auth(user_dn, conn): + search_filter = '(objectClass=*)' + conn.search(user_dn, search_filter, attributes=['secureAuth']) + status = conn.entries[0].secureAuth + return(status) + def get_user_data(user_dn, conn): search_filter = '(objectClass=*)' conn.search(user_dn, search_filter, @@ -1048,6 +1188,8 @@ def newSession(): def __init__(self): super(Session, self).__init__() self.data = bottle.request.environ.get('beaker.session') + #self.logged_in = False + self.secure_logged_in = False self.lang = self.get_lang() #localization self.lang = self.get_lang() diff --git a/index.tpl b/index.tpl index 393e37b..cb1592d 100644 --- a/index.tpl +++ b/index.tpl @@ -14,13 +14,22 @@

{{ str['login'] }}

+ + %try: + %if two_factor_authentication: +
+ + + %end + %except: + + + - - - - - - + + + + %end {{ str['or-sign-up'] }} diff --git a/index_ezabatu.tpl b/index_ezabatu.tpl new file mode 100644 index 0000000..cb1592d --- /dev/null +++ b/index_ezabatu.tpl @@ -0,0 +1,46 @@ + + + + + + + + + {{ str['login'] }} + + + + + +
+

{{ str['login'] }}

+ + %try: + %if two_factor_authentication: + + + + %end + %except: + + + + + + + + %end + + + {{ str['or-sign-up'] }} + + + %for type, text, animation in get('alerts', []): +
+
{{ text }}
+
+ %end + +
+ + diff --git a/libs/helper.py b/libs/helper.py index e8b9214..c4d1592 100644 --- a/libs/helper.py +++ b/libs/helper.py @@ -60,7 +60,7 @@ class Tools(): self.gen_qr(secret) return secret - def 2fa_validation(self, otp): + def _2fa_validation(self, otp, secret): authenticated = valid_totp(otp, secret) if authenticated: print('Correct otp, Authenticated!') @@ -69,5 +69,4 @@ class Tools(): print('Wrong otp, please try again.') return False - -Tools = Tools() +tools = Tools() diff --git a/uwsgi.ini.example b/uwsgi.ini.example index 28646b0..6f37460 100644 --- a/uwsgi.ini.example +++ b/uwsgi.ini.example @@ -4,5 +4,6 @@ http = :8080 chdir = %v wsgi-file = %v/app.py +enable-thread = true processes = 1 threads = 2