123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import os
- from functools import wraps
- from flask import current_app
- from flask import g
- from flask import has_request_context
- from flask import request, session
- from flask_login import user_logged_in
- from flask_login.config import EXEMPT_METHODS
- from werkzeug.exceptions import Unauthorized
- from werkzeug.local import LocalProxy
- from extensions.ext_database import db
- from models.account import Account, Tenant, TenantAccountJoin
- #: A proxy for the current user. If no user is logged in, this will be an
- #: anonymous user
- current_user = LocalProxy(lambda: _get_user())
- def login_required(func):
- """
- If you decorate a view with this, it will ensure that the current user is
- logged in and authenticated before calling the actual view. (If they are
- not, it calls the :attr:`LoginManager.unauthorized` callback.) For
- example::
- @app.route('/post')
- @login_required
- def post():
- pass
- If there are only certain times you need to require that your user is
- logged in, you can do so with::
- if not current_user.is_authenticated:
- return current_app.login_manager.unauthorized()
- ...which is essentially the code that this function adds to your views.
- It can be convenient to globally turn off authentication when unit testing.
- To enable this, if the application configuration variable `LOGIN_DISABLED`
- is set to `True`, this decorator will be ignored.
- .. Note ::
- Per `W3 guidelines for CORS preflight requests
- <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
- HTTP ``OPTIONS`` requests are exempt from login checks.
- :param func: The view function to decorate.
- :type func: function
- """
- @wraps(func)
- def decorated_view(*args, **kwargs):
- auth_header = request.headers.get('Authorization')
- admin_api_key_enable = os.getenv('ADMIN_API_KEY_ENABLE', default='False')
- if admin_api_key_enable:
- if auth_header:
- if ' ' not in auth_header:
- raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.')
- auth_scheme, auth_token = auth_header.split(None, 1)
- auth_scheme = auth_scheme.lower()
- if auth_scheme != 'bearer':
- raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.')
- admin_api_key = os.getenv('ADMIN_API_KEY')
- if admin_api_key:
- if os.getenv('ADMIN_API_KEY') == auth_token:
- workspace_id = request.headers.get('X-WORKSPACE-ID')
- if workspace_id:
- tenant_account_join = db.session.query(Tenant, TenantAccountJoin) \
- .filter(Tenant.id == workspace_id) \
- .filter(TenantAccountJoin.tenant_id == Tenant.id) \
- .filter(TenantAccountJoin.role == 'owner') \
- .one_or_none()
- if tenant_account_join:
- tenant, ta = tenant_account_join
- account = Account.query.filter_by(id=ta.account_id).first()
- # Login admin
- if account:
- account.current_tenant = tenant
- current_app.login_manager._update_request_context_with_user(account)
- user_logged_in.send(current_app._get_current_object(), user=_get_user())
- if request.method in EXEMPT_METHODS or current_app.config.get("LOGIN_DISABLED"):
- pass
- elif not current_user.is_authenticated:
- return current_app.login_manager.unauthorized()
- # flask 1.x compatibility
- # current_app.ensure_sync is only available in Flask >= 2.0
- if callable(getattr(current_app, "ensure_sync", None)):
- return current_app.ensure_sync(func)(*args, **kwargs)
- return func(*args, **kwargs)
- return decorated_view
- def _get_user():
- if has_request_context():
- if "_login_user" not in g:
- current_app.login_manager._load_user()
- return g._login_user
- return None
|