在Python后端项目FastApi中使用MongoDB进行数据处理

我在前面随笔《在SqlSugar的开发框架中增加对低代码EAV模型(实体-属性-值)的WebAPI实现支持》中介绍了对于EAV数据存储的时候,我们把变化字段的数据记录存储在MongoDB数据库里面,这样除了支持动态化字段变化外,也更好的支持对字段不同类型的查询处理,之前随笔介绍的是基于C#操作MongoDB数据库的处理,由于Python后端FastApi项目的设计初衷是可以平滑更换 SqlSugar项目的Web API的,因此会涉及到在Python项目中对MongoDB的相关操作。本篇随笔先对Python环境中操作MongoDB数据库进行相关的介绍。

1、Python环境中操作MongoDB数据库的准备

如果需要了解相关MongoDB数据库的相关信息和C#开发MongoDB的内容,可以参考我的随笔《MongoDB数据库内容》,里面包含我对这个主题的相关介绍。

具体可以进一步参考官网里面的介绍。

https://www.mongodb.com/zh-cn/docs/

https://www.mongodb.com/zh-cn/docs/drivers/csharp/current/

https://www.mongodb.com/zh-cn/docs/languages/python/

https://www.mongodb.com/zh-cn/docs/drivers/motor/

如果我们需要下载安装,可以根据不同的操作系统下载对应的社区版本安装文件即可。

https://www.mongodb.com/zh-cn/products/self-managed/community-edition

安装完成后,可以使用管理工具进行MongoDB数据库的连接和创建,如下所示可以根据不同的需要创建不同的集合。

关系型数据库和MongoDB的数据库,它们的相关概念,对比关系图如下所示。

我们安装MongoDB数据库后,它是驻留在Window的服务里面,创建服务并顺利启动成功后,然后就可以在系统的服务列表里查看到了,我们确认把它设置为自动启动的Windows服务即可。

由于我们希望使用异步来操作MongoDB数据库,推荐使用motor驱动来操作它。Motor是一个异步mongodb driver,支持异步读写mongodb。

具体使用我们可以参考官网的介绍:https://www.mongodb.com/zh-cn/docs/drivers/motor/

使用前,我们需要再我们FastAPI项目中安装MongoDB和Motor的依赖模块。就是pymonogo和motor

我的requirement.txt文件中包含下面两个

 我们如果没有初始化安装,通过pip进行安装即可。

pip install pymongo motor

使用它的基础代码如下所示。

import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient()
或者
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
或者
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://username:password@localhost:27017/dbname')

根据MongoDB官网的说明,MongoDB的适用场景如下:

1)网站实时数据:MongoDB非常适合实时的插入,更新与查询,并具备网站实时数据存储所需的复制及高度伸缩性。

2)数据缓存:由于性能很高,MongoDB也适合作为信息基础设施的缓存层。在系统重启之后,由MongoDB搭建的持久化缓存层可以避免下层的数据源过载。

3)大尺寸、低价值数据存储:使用传统的关系型数据库存储一些数据时可能会比较昂贵,在此之前,很多时候程序员往往会选择传统的文件进行存储。

4)高伸缩性场景:MongoDB非常适合由数十或数百台服务器组成的数据库。MongoDB的路线图中已经包含对MapReduce引擎的内置支持。

5)对象或JSON数据存储:MongoDB的BSON数据格式非常适合文档化格式的存储及查询。

MongoDB数据库支持常规的增删改查等操作,其中它的 find方法很强大,可以组合很多条件查询的方式,如下所示:

db.collection.find({ "key" : value }) 查找key=value的数据
db.collection.find({ "key" : { $gt: value } }) key > value
db.collection.find({ "key" : { $lt: value } }) key < value
db.collection.find({ "key" : { $gte: value } }) key >= value
db.collection.find({ "key" : { $lte: value } }) key <= value
db.collection.find({ "key" : { $gt: value1 , $lt: value2 } }) value1 < key <value2
db.collection.find({ "key" : { $ne: value } }) key <> value
db.collection.find({ "key" : { $mod : [ 10 , 1 ] } }) 取模运算,条件相当于key % 10 == 1 即key除以10余数为1的
db.collection.find({ "key" : { $nin: [ 1, 2, 3 ] } }) 不属于,条件相当于key的值不属于[ 1, 2, 3 ]中任何一个
db.collection.find({ "key" : { $in: [ 1, 2, 3 ] } }) 属于,条件相当于key等于[ 1, 2, 3 ]中任何一个
db.collection.find({ "key" : { $size: 1 } }) $size 数量、尺寸,条件相当于key的值的数量是1(key必须是数组,一个值的情况不能算是数量为1的数组)
db.collection.find({ "key" : { $exists : true|false } }) $exists 字段存在,true返回存在字段key的数据,false返回不存在字度key的数据
db.collection.find({ "key": /^val.*val$/i }) 正则,类似like;“i”忽略大小写,“m”支持多行
db.collection.find({ $or : [{a : 1}, {b : 2} ] }) $or或 (注意:MongoDB 1.5.3后版本可用),符合条件a=1的或者符合条件b=2的数据都会查询出来
db.collection.find({ "key": value , $or : [{ a : 1 } , { b : 2 }] }) 符合条件key=value ,同时符合其他两个条件中任意一个的数据
db.collection.find({ "key.subkey" :value }) 内嵌对象中的值匹配,注意:"key.subkey"必须加引号
db.collection.find({ "key": { $not : /^val.*val$/i } }) 这是一个与其他查询条件组合使用的操作符,不会单独使用。上述查询条件得到的结果集加上$not之后就能获得相反的集合。

比较符号说明如下:

符  号含  义示  例
$lt小于{'age': {'$lt': 20}}
$gt大于{'age': {'$gt': 20}}
$lte小于等于{'age': {'$lte': 20}}
$gte大于等于{'age': {'$gte': 20}}
$ne不等于{'age': {'$ne': 20}}
$in在范围内{'age': {'$in': [20, 23]}}
$nin不在范围内{'age': {'$nin': [20, 23]}}

另外,还可以进行正则匹配查询。例如,查询名字以 M 开头的学生数据,示例如下:

results = collection.find({'name': {'$regex': '^M.*'}})

这里使用 $regex 来指定正则匹配,^M.* 代表以 M 开头的正则表达式。

多条件查询  $and $or

# and查询
db.collection.find({
 $and : [
 { "age" : {$gt : 10 }} ,
 { "gender" : "man" }
 ]
})
#or查询
db.collection.find({
 $or : [
 {"age" : {$gt : 10 }},
 { "gender" : "man"}
 ]
})
#and查询 和 or查询
db.inventory.find( {
 $and : [
 { $or : [ { price : 0.99 }, { price : 1.99 } ] },
 { $or : [ { sale : true }, { qty : { $lt : 20 } } ] }
 ]
} )

关于这些操作的更详细用法,可以在 MongoDB 官方文档找到: https://docs.mongodb.com/manual/reference/operator/query/

当然还有插入更新的处理语句也是很特别的。

db.student.insert({name:'student1',subject:['arts','music']})

特别是更新操作需要说明一下,支持常规的$set方法(修改)、$unset方法(移除指定的键),还有原子级的$inc方法(数值增减),$rename方法(重命名字段名称)等等,

db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")}, {"$set" : {"hobby" :["swimming","basketball"]}} )
db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")},{"$unset" : {"hobby" :1 }} )
db.posts.update({"_id" : ObjectId("5180f1a991c22a72028238e4")}, {"$inc":{"pageviews":1}})
db.students.update( { _id: 1 }, { $rename: { 'nickname': 'alias', 'cell': 'mobile' } }

upsert是一种特殊的更新操作,不是一个操作符。(upsert = up[date]+[in]sert),也就是如果存在则更新,否则就写入一条新的记录操作。这个参数是个布尔类型,默认是false。

db.users.update({age :25}, {$inc :{"age" :3}}, true)

另外,Update可以对Json的集合进行处理,如果对于subject对象是一个集合的话,插入或更新其中的字段使用下面的语句

db.student.update({name:'student5'},{$set:{subject:['music']}},{upsert:true});

如果是记录已经存在,我们可以使用索引数值进行更新其中集合里面的数据,如下所示。

db.student.update({name:'student3'},{$set:{'subject.0':'arts'}});

 

2、在FastAPI项目中使用pymongo 和motor 操作MongoDB

上面我们大致介绍了一些基础的MongoDB数据库信息。

之前随笔《Python 开发环境的准备以及一些常用类库模块的安装》介绍过我们的FastAPI配置信息,存储在.env文件中的,在FastAPI项目启动的时候,根据需要进行读取加载到对象里面。

如对于配置信息的处理,我们还可以引入 python-dotenv 和  pydantic_settings 来统一管理配置参数。我们的项目.env文件部分配置如下。

 最后我们通过pydantic_settings的配置处理,获得相关的配置对象信息,如下。

其中的MONGO_URL 就是我们具体使用的MogoDB数据库连接信息了。

为了和数据库连接一样方便使用MongoDB的连接,我们可以通过在一个独立的文件中声明获得MongoDB的数据库和集合对象,并且通过缓存的方式提高使用效率。

from motor.motor_asyncio import AsyncIOMotorClient
from motor.motor_asyncio import AsyncIOMotorCollection
from functools import lru_cache
from core.config import settings
class MongoClientManager:
 def __init__(self, uri: str = settings.MONGO_URL):
 """初始化 MongoDB 客户端管理器
 Args:
 uri = mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}"""
 self._client = AsyncIOMotorClient(uri)
 self._db = self._client.get_default_database() # 使用 URI 中指定的 dbname
 @property
 def db(self):
 return self._db
 def close(self):
 self._client.close()
# 创建全局 Mongo 管理器实例
@lru_cache()
def get_mongo_client() -> MongoClientManager:
 return MongoClientManager()
@lru_cache()
def get_collection(entity_model: str) -> AsyncIOMotorCollection:
 """
 获取指定名称的集合(collection),类型是 BsonDocument(等价于 Python dict)
 MongoDB 的集合通常在首次写入数据时自动创建,因此你只需要获取对应集合即可,不需要手动“创建”
 """
 db = get_mongo_client().db
 return db[entity_model]

这样,我们项目启动的时候,自动加载配置文件,并在这里初始化MongoDBClient的异步对象和集合缓存对象。

有了这些,我们为了方便,还需要对MongoDB数据库的操作进行一些的封装处理,以提高我们对接口的使用遍历,毕竟我们前面介绍到了MongoDB支持非常复杂的查询和处理,我们往往只需要一些特殊的接口即可,因此封装接口有利于我们对接口的使用便利性。

我们在项目的utils目录中增加一个辅助类mongo_helper.py,用来封装MongoDB的相关操作的。

我们截取部分代码,如下所示。

class MongoAsyncHelper:
 """基于 motor 实现MongoDb异步操作,支持:
 insert_one / insert_many
 find_one / find_many
 update_one / update_many
 delete_one / delete_many
 复杂条件、分页、排序、模糊查询等"""
 def __init__(self, collection: AsyncIOMotorCollection):
 self.collection = collection
 async def insert_one(self, data: Dict[str, Any]):
 """插入单条数据
 examples:
 helper = MongoAsyncHelper(collection)
 await helper.insert_one({
 "name": "John",
 "age": 25,
 "created_at": datetime.utcnow()}
 )
 args:
 data: 待插入的数据
 """
 return await self.collection.insert_one(data)
 async def insert_many(self, data_list: List[Dict[str, Any]]):
 """插入多条数据
 examples:
 helper = MongoAsyncHelper(collection)
 await helper.insert_many([
 {"name": "John", "age": 25, "created_at": datetime.utcnow()},
 {"name": "Mary", "age": 30, "created_at": datetime.utcnow()}
 ])
 args:
 data_list: 待插入的数据列表
 """
 return await self.collection.insert_many(data_list)
 async def find_by_id(self, id: str) -> Optional[Dict[str, Any]]:
 """根据 _id 查询单条数据"""
 return await self.find_one({"_id": id})
 async def update_by_id(self, id: str, update_data: Dict[str, Any]):
 """根据 _id 更新单条数据"""
 return await self.update_one({"_id": id}, {"$set": update_data})
 async def delete_by_id(self, id: str):
 """根据 _id 删除单条数据"""
 return await self.delete_one({"_id": id})
 ...............

例如,我们在前面介绍的EAV处理中,获取MongoDB数据库的指定实体类型(对应表)的所有集合处理,在Python的数据处理层代码中如下所示。

async def mongo_get_all(
 self,
 db: AsyncSession,
 entitytype_id: str,
 parentid: str = None,
 sorting: str = None,
 ) -> List[dict]:
 """获取实体类型的所有记录
 :param db: 数据库会话
 :param entitytype_id: 实体类型ID
 :param parentid: 父ID
 :param sorting: 排序条件,格式:name asc 或 name asc,age desc
 """
 items = []
 try:
 entityType: EntityType = await self.__get_entity_type(db, entitytype_id)
 self.logger.info(f"获取实体类型:{entitytype_id}, {entityType.name}")
 if not entityType:
 return items
 # 连接 MongoDB
 collection = get_collection(entityType.entitymodel)
 mongo_helper = MongoAsyncHelper(collection)
 # 构建查询条件
 filter = {}
 if parentid is not None:
 filter["ParentId"] = parentid
 sort_by = parse_sort_string(sorting)
 # print(sort_by)
 items = await mongo_helper.find_all(filter, sort_by)
 return items
 except Exception as e:
 self.logger.error(f"处理失败:{str(e)}")
 return items

首先通过缓存接口get_collection获得对应实体类型的集合,然后传递集合到辅助类 MongoAsyncHelper后构建MongoDB辅助类,就可以利用其接口进行相关的MongoDB数据库操作处理了。

最后通过类似的处理,结合数据库操作和MongoDB数据库操作,我们把EAV的接口服务迁移到了Python中FastAPI项目中了。

最后完成的FastAPI项目的EAV相关接口如下所示。

折叠相关模块显示如下所示。

 最后切换到Winform项目上,调整接入的Web API数据源,同样获得一样的界面效果即可。

产品数据表。

订单数据表

 

作者:伍华聪原文地址:https://www.cnblogs.com/wuhuacong/p/18844209

%s 个评论

要回复文章请先登录注册