Flask+Redis实现登录权限管理

2021/5/11 19:25:16

本文主要是介绍Flask+Redis实现登录权限管理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Flask+Redis实现登录权限管理

1 准备工作

Redis简单来说就是远程字典服务,通常也被称为数据结构服务器,因为他的值(value)可以是多种形式的。再开始之前,需要先安装Redis,这里先不过多赘述,需要注意的是安装完成后需要设置密码,具体方法可以在百度上搜索,很简单。 首先在Flask配置文件添加Redis的配置信息。

app.config['REDIS_HOST'] = 'localhost'   #Redis的ip地址,本机的就是localhost

app.config['REDIS_PORT'] = 6379   #Redis端口,默认为6379

app.config['REDIS_DB'] = '0'

app.config['REDIS_PWD'] = 'yourpassword'  #Redis的密码

2 Redis数据库操作

在utils文件目录下新建redis_utils.py文件,作用是对redis数据库进行操作。代码如下

import pickle

import redis

from flask import current_app as app

 

class Redis(object):

    """

    redis数据库操作

    """

    @staticmethod

    def _get_r():

        host = app.config['REDIS_HOST']

        port = app.config['REDIS_PORT']

        db = app.config['REDIS_DB']

        passwd = app.config['REDIS_PWD']

        r = redis.StrictRedis(host=host, port=port, db=db, password=passwd)

        return r

 

    @classmethod

    def write(self, key, value, expire=None):

        """

        写入键值对

        """

        # 判断是否有过期时间,没有就设置默认值

        if expire:

            expire_in_seconds = expire

        else:

            expire_in_seconds = app.config['EXPIRES_IN']

        r = self._get_r()

        r.set(key, value, ex=expire_in_seconds)

 

    @classmethod

    def write_dict(self, key, value, expire=None):

        '''

        将内存数据二进制通过序列号转为文本流,再存入redis

        '''

        if expire:

            expire_in_seconds = expire

        else:

            expire_in_seconds = app.config['REDIS_EXPIRE']

        r = self._get_r()

        r.set(pickle.dumps(key), pickle.dumps(value), ex=expire_in_seconds)

 

    @classmethod

    def read_dict(self, key):

        '''

        将文本流从redis中读取并反序列化,返回

        '''

        r = self._get_r()

        data = r.get(pickle.dumps(key))

        if data is None:

            return None

        return pickle.loads(data)

 

    @classmethod

    def read(self, key):

        """

        读取键值对内容

        """

        r = self._get_r()

        value = r.get(key)

        return value.decode('utf-8') if value else value

 

    @classmethod

    def hset(self, name, key, value):

        """

        写入hash表

        """

        r = self._get_r()

        r.hset(name, key, value)

 

    @classmethod

    def hmset(self, key, *value):

        """

        读取指定hash表的所有给定字段的值

        """

        r = self._get_r()

        value = r.hmset(key, *value)

        return value

 

    @classmethod

    def hget(self, name, key):

        """

        读取指定hash表的键值

        """

        r = self._get_r()

        value = r.hget(name, key)

        return value.decode('utf-8') if value else value

 

    @classmethod

    def hgetall(self, name):

        """

        获取指定hash表所有的值

        """

        r = self._get_r()

        return r.hgetall(name)

 

    @classmethod

    def delete(self, *names):

        """

        删除一个或者多个

        """

        r = self._get_r()

        r.delete(*names)

 

    @classmethod

    def hdel(self, name, key):

        """

        删除指定hash表的键值

        """

        r = self._get_r()

        r.hdel(name, key)

 

    @classmethod

    def expire(self, name, expire=None):

        """

        设置过期时间

        """

        if expire:

            expire_in_seconds = expire

        else:

            expire_in_seconds = app.config['REDIS_EXPIRE']

        r = self._get_r()

        r.expire(name, expire_in_seconds)

3 实现用户登录验证

首先注册一个user的蓝图

app.register_blueprint(user_blueprint, url_prefix="/user/")

 

user = Blueprint("user", __name__)

在permisson目录下新建一个user.py。这里是写跟登录有关的接口的。首先是登录验证,大概思路是先接收用户名与密码,然后校验参数,两者都不能为空,接着用用户名去user数据库查找是否存在此用户,如果查找结果为空,则返回一个错误码。接着校验接收到的密码与数据库的密码是否匹配。(在存入密码的时候不能直接存明文,需要加密,此处用到了werkzeug.security这个库进行加密。)校验密码还是用这个库的check_password_hash,只需传入需要验证的两个密码。再用户名与密码都正确的情况下,生成一个token,以后客户端只需要带上这个token来请求即可。这时token需要存入Redis中,客户端传来的token就和Redis中的token作对比。当生成token后,这就登录成功了,然后返回token等用户信息。接下来是login的代码

@user.route('/login', methods=["POST"])

def login():

    '''

    用户登录

    :return:token

    '''

    data = request.get_data()

    data = str(data, 'utf-8')

    res_dir = json.loads(data)

    print(res_dir)

    if res_dir is None:

        return NO_PARAMETER()

    # 获取前端传过来的参数

    username = res_dir.get("username")

    password = res_dir.get("password")

    # 校验参数

    if not all([username, password]):

        return jsonify(code=Code.NOT_NULL.value, msg="用户名和密码不能为空")

    try:

        user = User.query.filter_by(user_name=username).first()

    except Exception as e:

        print("login error:{}".format(e))

        return jsonify(code=Code.REQUEST_ERROR.value, msg="获取信息失败")

    if user is None or not user.check_pwd(password) or user.del_flag == 2 or user.status == 2:

        return jsonify(code=Code.ERR_PWD.value, msg="用户名或密码错误")

 

    # 获取用户信息,传入生成token的方法,并接收返回的token

    # 获取用户角色

    user_role = Role.query.join(UserRole, Role.id == UserRole.role_id).join(User,

                                                                              UserRole.user_id == user.id).filter(

        User.id == user.id).all()

    role_list = [i.role_key for i in user_role]

    token = create_token(user.id, user.user_name, role_list)

    data = {'token': token, 'userId': user.id, 'userName': user.user_name, 'nickname': user.nickname}

    # 记录登录ip将token存入rerdis

    try:

        user.login_ip = request.remote_addr

        print(user)

        user.update()

        Redis.write(f"token_{user.user_name}", token)

 

    except Exception as e:

        print(e)

        return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登录失败:"+str(e))

    if token:

        # 把token返回给前端

        return jsonify(code=Code.SUCCESS.value, msg="登录成功", data=data)

    else:

        return jsonify(code=Code.REQUEST_ERROR.value, msg="请求失败", data=token)

生成token的方法

def create_token(user_id, user_name, role_list):

    '''

    生成token

    :param api_user:用户id

    :return: token

    '''

    # 第一个参数是内部的私钥,这里写在共用的配置信息里了,如果只是测试可以写死

    # 第二个参数是有效期(秒)

    s = Serializer(app.config['SECRET_KEY'], expires_in=app.config['EXPIRES_IN'])

    # 接收用户id转换与编码

    token = None

    try:

        token = s.dumps({"id": user_id, "name": user_name, "role": role_list}).decode("ascii")

    except Exception as e:

        app.logger.error("获取token失败:{}".format(e))

    return token

校验token方法:

def verify_token(token):

    '''

    校验token

    :param token:

    :return: 用户信息 or None

    '''

    # 参数为私有秘钥,跟上面方法的秘钥保持一致

    s = Serializer(app.config['SECRET_KEY'])

    try:

        # 转换为字典

        data = s.loads(token)

        return data

    except Exception as e:

        app.logger.error(f"token转换失败:{e}")

        return None

注销登录,接收到token后作对比,成功后将redis中token删除。

@user.route('/logout', methods=["POST"])

@login_required()

def logout():

    '''

    注销方法:redis删除token

    :return:

    '''

    try:

        token = request.headers["Authorization"]

        user = verify_token(token)

        if user:

            key = f"token_{user.get('name')}"

            redis_token = Redis.read(key)

            if redis_token:

                Redis.delete(key)

            return SUCCESS()

        else:

            return AUTH_ERR()

    except Exception as e:

        app.logger.error(f"注销失败")

        return REQUEST_ERROR()

检查是否登录

@user.route('/check_token', methods=["POST"])

def check_token():

    # 在请求头上拿到token

    token = request.headers["Authorization"]

    user = verify_token(token)

    if user:

        key = f"token_{user.get('name')}"

        redis_token = Redis.read(key)

        if redis_token == token:

            return SUCCESS(data=user.get('id'))

        else:

            return OTHER_LOGIN()

    else:

        return AUTH_ERR()

最后是需要做一个登录拦截器,意思是在客户端访问某些接口时,需要先进行登录验证,通过以后才能正常访问。

def login_required(*role):

    def decorator(func):

        @functools.wraps(func)

        def wrapper(*args, **kw):

            try:

                # 在请求头上拿到token

                token = request.headers["Authorization"]

            except Exception as e:

                # 没接收的到token,给前端抛出错误

                return jsonify(code=Code.NO_PARAMETER.value, msg='缺少参数token')

            s = Serializer(app.config['SECRET_KEY'])

            try:

                user = s.loads(token)

                if role:

                    # 获取token中的权限列表如果在参数列表中则表示有权限,否则就表示没有权限

                    user_role = user['role']

                    result = [x for x in user_role if x in list(role)]

                    if not result:

                        return jsonify(code=Code.ERR_PERMISSOM.value, msg="权限不够")

            except Exception as e:

                return jsonify(code=Code.LOGIN_TIMEOUT.value, msg="登录已过期")

            return func(*args, **kw)

        return wrapper

    return decorator

只需要在需要的接口下面加上@login_required()即可。例如

@home.route('/')

@login_required()

def test():

    return jsonify({

        'code': 0,

        'msg': 'test',

    })

 

 

 

 

 

 

 

 

刘玉江

2020年9月13日

 



这篇关于Flask+Redis实现登录权限管理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程