介绍
推荐系统使应用程序能够为用户生成智能提议,有效地从其他内容中筛选出相关内容。在本文中,我们构建并部署了一个动态的视频游戏推荐系统,利用PostgreSQL、FastAPI和Render来根据用户与之交互的游戏推荐新游戏。我们的目的是提供一个清晰的示例,展示如何构建一个独立的推荐系统,然后可以将其与前端系统或其他应用程序进行连接。
对于这个项目,我们使用从Steam API获取的视频游戏数据,但这很容易被你感兴趣的任何产品数据所替代,关键步骤将是一样的。我们将介绍如何将这些数据存储在数据库中,将游戏标签向量化,基于用户与游戏的互动生成类似度分数,并返回一系列相关推荐。在本文末尾,我们将部署这个推荐系统作为一个使用FastAPI的Web应用程序,这样每当用户与新游戏互动时,我们就可以动态生成并存储一组新的推荐给该用户。
将使用以下工具:
· PostgreSQL
· FastAPI
· Docker
· 呈现
那些只对 GitHub 仓库感兴趣的人可以在这里找到。
目录
由于这个项目很长,它被分成了两篇文章。第一部分涵盖了这个项目背后的设置和理论(下面显示的步骤1-5),第二部分涵盖了部署它。如果你正在寻找第二部分,它位于这里。
第一部分
1. 数据集概览
2. 整体系统架构
3. 数据库设置
4. FastAPI 设置- 模型- 路由
5. 构建类似性管道
第二部分:
6. 在 Render 上部署 PostgreSQL 数据库
7. 将FastAPI应用程序部署为Render Web应用程序- Docker化我们的应用程序- 将Docker镜像推送到DockerHub- 从DockerHub拉取到Render
数据集概述
这个项目的数据集包含了来自steamworks API的前~2000款游戏的数据。这些数据是免费的,并且授权供个人和商业使用,但需遵守服务条款,其中规定了每5分钟最多200次请求的速率限制,这导致我们只能使用数据的一个子集。服务条款可以在这里找到。
以下是游戏数据集的概述。大多数字段都相对自我描述;需要注意的关键事项是唯一的产品标识符是appid。除了这个数据集,我们还有几个额外的表格,我们将在下面详细介绍;对于我们的推荐系统来说,最重大的是游戏标签表,其中包含将每个标签与游戏相关联的appid值(策略、RPG、卡牌游戏等)。这些标签是从数据概述中显示的类别字段中提取出来的,然后进行了旋转,以创建游戏标签表,以便每个appid:类别组合都有一个唯一的行。

图2:数据集概览

有关我们项目结构的更详细概述,请参见下面的图表。

图3:项目文件结构

目前我们将简要介绍该项目的架构,然后深入探讨如何填充我们的数据库。
建筑
对于我们的推荐系统,我们将使用一个带有FastAPI数据访问+处理层的PostgreSQL数据库,这将允许我们向用户的游戏列表中添加或移除游戏。用户通过FastAPI的POST请求对其游戏库进行更改时,还将启动一个利用FastAPI的后台任务功能的推荐流程,该流程将从数据库中查询他们喜爱的游戏,计算与不喜爱的游戏的类似度分数,并将新的前N个推荐游戏更新到用户推荐表中。最后,PostgreSQL数据库和FastAPI服务将部署在Render上,以便可以在我们的本地环境之外访问它们。在这个部署步骤中,任何云服务都可以使用,但在这种情况下我们选择了Render,由于它的简单性。
总结一下,从用户的角度来看,我们的整体工作流程将如下所示:
8. 用户通过从FastAPI向我们的数据库发出POST请求将游戏添加到他们的库中。如果我们想将我们的推荐系统附加到前端应用程序,我们可以轻松地将这个Post API与用户界面联系起来。
9. 此POST请求启动了一个FastAPI后台任务,该任务运行我们的推荐系统流程。
10. 推荐系统管道会查询我们的数据库,获取用户的游戏列表和全局游戏列表。
11. 然后使用我们的游戏标签计算用户的游戏与所有游戏之间的类似度分数。
12. 最后,我们的推荐系统管道向数据库发出一个POST请求,以更新该用户的推荐游戏表。
· 如果我们想要将我们的推荐系统连接到一个前端应用程序,我们可以很容易地将这个帖子API与用户界面联系起来。

图4:推荐系统图解

设置数据库
在构建推荐系统之前,第一步是设置我们的数据库。我们的基本数据库图表如图5所示。我们之前讨论过我们的游戏表;这是其他数据一般派生的基础数据集。我们的所有表的完整列表如下:
· 游戏表:包含我们数据库中每个独特游戏的基本游戏数据
· 用户表:一个包含示例信息的虚拟用户表,用于示例填充。
· User_GameTable:包含用户“喜爱”的所有游戏之间的映射;这个表是用来生成推荐的基本表之一,通过捕捉用户感兴趣的游戏。
· Game_TagsTable:这包含一个 appid:game_tag 映射,其中游戏标签可以是诸如“策略”、“角色扮演”、“喜剧”之类的内容,是捕捉游戏本质一部分的描述性标签。每个 appid 映射到多个标签。
· 用户推荐表:这是我们的目标表,将由我们的流水线更新。每当用户与新游戏互动时,我们的推荐流水线将运行并为该用户生成一系列新的推荐,这些推荐将存储在这里。
游戏
用户
用户游戏
游戏标签
用户推荐

图5:数据库图解

要设置这些表,我们只需运行我们的src/load_database.py文件。该文件会按照下面概述的几个步骤创建和填充我们的表。请注意,目前我们将专注于理解如何将这些数据写入通用数据库,所以您目前需要知道的是下面的External_Database_Url是您想要使用的数据库的URL。在本文的后半部分,我们将介绍如何在Render上设置数据库并将URL复制到您的.env文件中。
src/load_database.py
外部数据库网址
“`python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from
sqlalchemy.ext.declarative import declarative_base
import os
from dotenv import load_dotenv
from utils.db_handler import DatabaseHandler
import pandas as pd
import uuid
import sys
from sqlalchemy.exc import OperationalError
import psycopg2
# 加载环境变量
load_dotenv(override=True)
# 为Render构建PostgreSQL连接URL
URL_database = os.environ.get(“External_Database_Url”)
# 使用我们的URL初始化DatabaseHandler
engine = DatabaseHandler(URL_database)
# 加载初始用户数据
users_df = pd.read_csv(“Data/users.csv”)
games_df = pd.read_csv(“Data/games.csv”)
user_games_df = pd.read_csv(“Data/user_games.csv”)
user_recommendations_df = pd.read_csv(“
Data/user_recommendations.csv”)
game_tags_df = pd.read_csv(“Data/game_tags.csv”)
“`
“`python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from
sqlalchemy.ext.declarative import declarative_base
import os
from dotenv import load_dotenv
from utils.db_handler import DatabaseHandler
import pandas as pd
import uuid
import sys
from sqlalchemy.exc import OperationalError
import psycopg2
# 加载环境变量
load_dotenv(override=True)
# 为Render构建PostgreSQL连接URL
URL_database = os.environ.get(“External_Database_Url”)
# 使用我们的URL初始化DatabaseHandler
engine = DatabaseHandler(URL_database)
# 加载初始用户数据
users_df = pd.read_csv(“Data/users.csv”)
games_df = pd.read_csv(“Data/games.csv”)
user_games_df = pd.read_csv(“Data/user_games.csv”)
user_recommendations_df = pd.read_csv(“
Data/user_recommendations.csv”)
game_tags_df = pd.read_csv(“Data/game_tags.csv”)
“`
第一,我们从我们的Data文件夹中加载五个CSV文件到数据框中;我们为数据库图中显示的每个表格准备了一个文件。我们还通过声明一个engine变量来建立与数据的连接;这个engine变量使用一个
customDataBaseHandlerclass,其初始化方法如下所示。这个类接受一个连接字符串到我们在Render(或您偏好的云服务)上的数据库,该连接字符串从我们的.env文件中传入,并包含我们的数据库连接、更新、删除和测试功能。
数据库处理程序
加载数据并实例化ourDatabaseHandler类后,我们需要定义一个查询来创建这五个表,并使用
DatabaseHandler.create_table函数执行这些查询。这是一个超级简单的函数,它连接到我们的数据库,执行查询,然后关闭连接,使我们在数据库图中看到的五个表为空;但是,它们目前是空的。
数据库处理程序
DatabaseHandler.create_table
# 定义用于创建表的查询
user_table_creation_query = “””CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL
)
“””
game_table_creation_query = “””CREATE TABLE IF NOT EXISTS games (
id UUID PRIMARY KEY,
appid VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(255),
is_free BOOLEAN DEFAULT FALSE,
short_description TEXT,
detailed_description TEXT,
developers VARCHAR(255),
publishers VARCHAR(255),
price VARCHAR(255),
genres VARCHAR(255),
categories VARCHAR(255),
release_date VARCHAR(255),
platforms TEXT,
metacritic_score FLOAT,
recommendations INTEGER
)
“””
user_games_query = “””CREATE TABLE IF NOT EXISTS user_games (
id UUID PRIMARY KEY,
username VARCHAR(255) NOT NULL,
appid VARCHAR(255) NOT NULL,
shelf VARCHAR(50) DEFAULT 'Wish_List',
rating FLOAT DEFAULT 0.0,
review TEXT
)
“””
recommendation_table_creation_query = “””CREATE TABLE IF NOT EXISTS user_recommendations (
id UUID PRIMARY KEY,
username VARCHAR(255),
appid VARCHAR(255),
similarity FLOAT
)
“””
game_tags_creation_query = “””CREATE TABLE IF NOT EXISTS game_tags (
id UUID PRIMARY KEY,
appid VARCHAR(255) NOT NULL,
category VARCHAR(255) NOT NULL
)
“””
# 运行查询以创建表
engine.delete_table('user_recommendations')
engine.delete_table('user_games')
engine.delete_table('game_tags')
engine.delete_table('games')
engine.delete_table('users')
# 创建表
engine.create_table(user_table_creation_query)
engine.create_table(game_table_creation_query)
engine.create_table(user_games_query)
engine.create_table(
recommendation_table_creation_query)
engine.create_table(game_tags_creation_query)
# 定义用于创建表的查询
user_table_creation_query = “””CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL
)
“””
game_table_creation_query = “””CREATE TABLE IF NOT EXISTS games (
id UUID PRIMARY KEY,
appid VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(255),
is_free BOOLEAN DEFAULT FALSE,
short_description TEXT,
detailed_description TEXT,
developers VARCHAR(255),
publishers VARCHAR(255),
price VARCHAR(255),
genres VARCHAR(255),
categories VARCHAR(255),
release_date VARCHAR(255),
platforms TEXT,
metacritic_score FLOAT,
recommendations INTEGER
)
“””
user_games_query = “””CREATE TABLE IF NOT EXISTS user_games (
id UUID PRIMARY KEY,
username VARCHAR(255) NOT NULL,
appid VARCHAR(255) NOT NULL,
shelf VARCHAR(50) DEFAULT 'Wish_List',
rating FLOAT DEFAULT 0.0,
review TEXT
)
“””
recommendation_table_creation_query = “””CREATE TABLE IF NOT EXISTS user_recommendations (
id UUID PRIMARY KEY,
username VARCHAR(255),
appid VARCHAR(255),
similarity FLOAT
)
“””
game_tags_creation_query = “””CREATE TABLE IF NOT EXISTS game_tags (
id UUID PRIMARY KEY,
appid VARCHAR(255) NOT NULL,
category VARCHAR(255) NOT NULL
)
“””
# 运行查询以创建表
engine.delete_table('user_recommendations')
engine.delete_table('user_games')
engine.delete_table('game_tags')
engine.delete_table('games')
engine.delete_table('users')
# 创建表
engine.create_table(user_table_creation_query)
engine.create_table(game_table_creation_query)
engine.create_table(user_games_query)
engine.create_table(
recommendation_table_creation_query)
engine.create_table(game_tags_creation_query)
在初始表格设置完成后,我们会运行质量检查,以确保每个数据集都有必需的ID列,将数据从数据框填充到相应的表中,然后进行测试以确保表被正确填充。如果设置工作正常,test_table函数将返回一个字典,形式为{'table_exists': True, 'table_has_data': True}。
{'table_exists': True, 'table_has_data': True}
# 确保每个数据框的每一行都有一个唯一的ID
if 'id' not in users_df.columns:
users_df['id'] = [str(uuid.uuid4()) for _ in range(len(users_df))]
if 'id' not in games_df.columns:
games_df['id'] = [str(uuid.uuid4()) for _ in range(len(games_df))]
if 'id' not in user_games_df.columns:
user_games_df['id'] = [str(uuid.uuid4()) for _ in range(len(user_games_df))]
if 'id' not in
user_recommendations_df.columns:
user_recommendations_df['id'] = [str(uuid.uuid4()) for _ in range(len(user_recommendations_df))]
if 'id' not in game_tags_df.columns:
game_tags_df['id'] = [str(uuid.uuid4()) for _ in range(len(game_tags_df)]
# 用数据框中的数据填充4个表
engine.populate_table_dynamic(users_df, 'users')
engine.populate_table_dynamic(games_df, 'games')
engine.populate_table_dynamic(user_games_df, 'user_games')
engine.populate_table_dynamic(user_recommendations_df, 'user_recommendations')
engine.populate_table_dynamic(game_tags_df, 'game_tags')
# 测试表是否被正确创建和填充
print(engine.test_table('users'))
print(engine.test_table('games'))
print(engine.test_table('user_games'))
print(engine.test_table('user_recommendations'))
print(engine.test_table('game_tags'))
# 确保每个数据框的每一行都有一个唯一的ID
if 'id' not in users_df.columns:
users_df['id'] = [str(uuid.uuid4()) for _ in range(len(users_df))]
if 'id' not in games_df.columns:
games_df['id'] = [str(uuid.uuid4()) for _ in range(len(games_df))]
if 'id' not in user_games_df.columns:
user_games_df['id'] = [str(uuid.uuid4()) for _ in range(len(user_games_df))]
if 'id' not in
user_recommendations_df.columns:
user_recommendations_df['id'] = [str(uuid.uuid4()) for _ in range(len(user_recommendations_df))]
if 'id' not in game_tags_df.columns:
game_tags_df['id'] = [str(uuid.uuid4()) for _ in range(len(game_tags_df)]
# 用数据框中的数据填充4个表
engine.populate_table_dynamic(users_df, 'users')
engine.populate_table_dynamic(games_df, 'games')
engine.populate_table_dynamic(user_games_df, 'user_games')
engine.populate_table_dynamic(user_recommendations_df, 'user_recommendations')
engine.populate_table_dynamic(game_tags_df, 'game_tags')
# 测试表是否被正确创建和填充
print(engine.test_table('users'))
print(engine.test_table('games'))
print(engine.test_table('user_games'))
print(engine.test_table('user_recommendations'))
print(engine.test_table('game_tags'))
使用FastAPI入门
目前我们已经设置并填充了数据库,我们需要使用 FastAPI 构建访问、更新和删除数据的方法。FastAPI 可以让我们轻松地构建标准化(和快速的)API,以便与我们的数据库进行交互。FastAPI 文档提供了一个很好的逐步教程,可以在这里找到。作为一个高层次的总结,FastAPI 具有几个出色的特性,使其成为作为数据库和前端应用程序之间交互层的理想选择。
13. 标准化:FastAPI允许我们使用GET、POST、DELETE、UPDATE等方法以标准化的方式定义路由,与我们的表进行交互。这种标准化使我们能够在纯Python中构建数据访问层,然后可以与各种前端应用程序进行交互。我们只需在前端调用我们想要的API方法,而不管它是用什么语言构建的。
14. 数据验证:正如我们将在下面展示的,我们需要为我们与之交互的每个对象(列如我们的游戏和用户表)定义一个Pydantic数据模型。其主要优势在于确保所有变量都具有定义的数据类型,例如,如果我们定义我们的Game对象,使得评分字段为float类型,而用户尝试发起一个带有评分“great”的新条目的POST请求,那么这个请求将无法成功。这种内置的数据验证将协助我们在系统扩展时防止各种数据质量问题。
15. 异步:FastAPI 函数可以异步运行,意味着它们之间不相互依赖。这可以显著提高性能,由于我们不会让一个慢的 Fast 任务等待另一个慢任务完成。
16. Swagger Docs内置:FastAPI具有内置的用户界面,我们可以在本地主机上导航到该界面,从而可以轻松测试和与我们的路由交互。
获取,提交,删除,更新
FastAPI 模型
我们项目中的FastAPI部分依赖于两个主要文件:models.py,定义我们将要交互的数据模型(游戏、用户等),以及main.py,定义我们实际的FastAPI应用程序并包含我们的路由。在FastAPI的上下文中,路由定义了处理请求的不同路径。例如,我们可能有一个 /games 路由从数据库请求游戏。
游戏
第一,让我们讨论我们的models.py文件。在这个文件中,我们定义了所有的模型。虽然我们为不同的对象有不同的模型,但一般的方法是一样的,所以我们只会详细讨论下面显示的游戏模型。你会注意到下面的第一件事是,我们为我们的游戏对象定义了两个实际的类:一个继承自Pydantic基础模型的GameModel类,和一个继承自sqlalchemy declarative_base的Game类。那么自然的问题是,为什么我们为一个数据结构(我们游戏的数据结构)有两个类?
模型.py
游戏模型
游戏
SQLAlchemy 声明基类
如果我们在这个项目中没有使用 SQL 数据库,而是每次运行 main.py 时将我们的每个 CSV 文件读入数据框架,那么我们就不需要 theGameclass,只需要 theGameModelclass。在这种情况下,我们将读取我们的 games.csv 数据框架,并且 FastAPI 将使用 theGameModelclass 来确保数据类型被正确遵守。
游戏
游戏模型
游戏模型
不过,由于我们正在使用 SQL 数据库,因此最好为我们的 API 和数据库分别创建不同的类,由于这两个类的工作略有不同。我们的 API 类处理数据验证、序列化和可选字段,而我们的数据库类处理数据库特定的问题,列如定义主键/外键、定义对象映射到哪个表以及保护安全数据。再次强调最后一点,我们的数据库中可能有一些仅供内部使用的敏感字段,我们不希望通过 API 将它们暴露给用户(例如密码)。我们可以通过拥有一个面向用户的 Pydantic 类和一个内部 SQL Alchemy 类来解决这个问题。
以下是如何为ourGames对象实现的示例;我们为其他表定义了单独的类,可以在这里找到;不过,总体结构是一样的。
游戏
从pydantic导入BaseModel
从uuid导入UUID,uuid4
从typing导入Optional
从enum导入Enum
从sqlalchemy导入Column, String, Float, Integer
import
sqlalchemy.dialects.postgresql as pg
从
sqlalchemy.dialects.postgresql导入UUID as SA_UUID
从
sqlalchemy.ext.declarative导入declarative_base
import uuid
从uuid导入UUID
# 加载sql模型
从sqlmodel导入Field, Session, SQLModel, create_engine, select
# 为SQLAlchemy模型初始化基类
Base = declarative_base()
# 这是数据库的Game模型
class Game(Base):
__tablename__ = “optigame_products” # PostgreSQL数据库中的表名
id = Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True, nullable=False)
appid = Column(String, unique=True, nullable=False)
name = Column(String, nullable=False)
type = Column(String, nullable=True)
is_free = Column(pg.BOOLEAN, nullable=True, default=False) #
short_description = Column(String, nullable=True)
detailed_description = Column(String, nullable=True)
developers = Column(String, nullable=True)
publishers = Column(String, nullable=True)
price = Column(String, nullable=True)
genres = Column(String, nullable=True)
categories = Column(String, nullable=True)
release_date = Column(String, nullable=True)
platforms = Column(String, nullable=True)
metacritic_score = Column(Float, nullable=True)
recommendations = Column(Integer, nullable=True)
class GameModel(BaseModel):
id: Optional[UUID] = None
appid: str
name: str
type: Optional[str] = None
is_free: Optional[bool] = False
short_description: Optional[str] = None
detailed_description: Optional[str] = None
developers: Optional[str] = None
publishers: Optional[str] = None
price: Optional[str] = None
genres: Optional[str] = None
categories: Optional[str] = None
release_date: Optional[str] = None
platforms: Optional[str] = None
metacritic_score: Optional[float] = None
recommendations: Optional[int] = None
class Config:
orm_mode = True # 启用ORM模式以与SQLAlchemy对象一起使用
from_attributes = True # 启用SQLAlchemy对象的属性访问
从pydantic导入BaseModel
从uuid导入UUID,uuid4
从typing导入Optional
从枚举导入Enum
从SQLAlchemy导入Column,String,Float,Integer
导入SQLAlchemy方言postgresql作为pg
从SQLAlchemy方言postgresql导入UUID作为SA_UUID
从
SQLAlchemy.ext.declarative导入declarative_base
导入uuid
从uuid导入UUID
# 加载sql模型
从sqlmodel导入Field,Session,SQLModel,create_engine,select
# 为SQLAlchemy模型初始化基类
Base = declarative_base()
# 这是数据库的Game模型
类Game(Base):
__tablename__ = “optigame_products” # PostgreSQL数据库中的表名
id = Column(pg.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, unique=True, nullable=False)
appid = Column(String, unique=True, nullable=False)
name = Column(String, nullable=False)
type = Column(String, nullable=True)
is_free = Column(pg.BOOLEAN, nullable=True, default=False) #
short_description = Column(String, nullable=True)
detailed_description = Column(String, nullable=True)
developers = Column(String, nullable=True)
publishers = Column(String, nullable=True)
price = Column(String, nullable=True)
genres = Column(String, nullable=True)
categories = Column(String, nullable=True)
release_date = Column(String, nullable=True)
platforms = Column(String, nullable=True)
metacritic_score = Column(Float, nullable=True)
recommendations = Column(Integer, nullable=True)
类GameModel(BaseModel):
id: Optional[UUID] = None
appid: str
name: str
type: Optional[str] = None
is_free: Optional[bool] = False
short_description: Optional[str] = None
detailed_description: Optional[str] = None
developers: Optional[str] = None
publishers: Optional[str] = None
price: Optional[str] = None
genres: Optional[str] = None
categories: Optional[str] = None
release_date: Optional[str] = None
platforms: Optional[str] = None
metacritic_score: Optional[float] = None
recommendations: Optional[int] = None
类Config:
orm_mode = True # 启用ORM模式以处理SQLAlchemy对象
from_attributes = True # 启用SQLAlchemy对象的属性访问
设置 FastAPI 路由
在我们定义了模型之后,我们可以创建方法与这些模型进行交互,并从数据库请求数据(GET),向数据库添加数据(POST),或从数据库中删除数据(DELETE)。以下是我们如何为游戏模型定义GET请求的示例。在main.py函数的开头,我们进行一些初始设置以获取数据库URL并连接到数据库。然后我们初始化我们的应用程序,并添加中间件来定义我们将接受请求的URL。由于我们将在Render上部署FastAPI项目,并从本地机器向其发送请求,我们允许的唯一来源是localhost端口8000。然后我们定义了我们的app.get方法fetch_products,它接受一个appid输入,查询我们的数据库,找到appid等于我们筛选后的appid的Game对象,并返回这些产品。
请注意下面的片段仅包含设置和第一个获取方法,其余部分都相当类似并且可以在存储库中找到,因此我们不会在这里为每个部分提供深入解释。
“`python
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from uuid import uuid4, UUID
from sqlalchemy import create_engine
from
sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
# 安全相关导入
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
# 自定义导入
from src.models import User, Game, GameModel, UserModel, UserGameModel, UserGame, GameSimilarity,GameSimilarityModel, UserRecommendation, UserRecommendationModel
from src.similarity_pipeline import UserRecommendationService
# 从环境变量或.env文件中加载数据库连接字符串
DATABASE_URL = os.environ.get(“Internal_Database_Url”)
# 创建数据库连接
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 创建数据库表(如果不存在)
Base.metadata.create_all(bind=engine)
# 用于获取数据库会话的依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 初始化FastAPI应用
app = FastAPI(title=”Game Store API”, version=”1.0.0″)
# 添加CORS中间件以允许请求
origins = [“http://localhost:8000”]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=[“*”],
allow_headers=[“*”],
)
#
————————————————-#
# ———-PART 1: GET
METHODS——————-#
#
————————————————-#
@app.get(“/”)
async def root():
return {“message”: “Hello World”}
@app.get(“/api/v1/games/”)
async def fetch_products(appid: str = None, db: Session = Depends(get_db)):
# 使用SQLAlchemy Game模型查询数据库
if appid:
products = db.query(Game).filter(Game.appid == appid).all()
else:
products = db.query(Game).all()
return [GameModel.from_orm(product) for product in products]
“`
“`python
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from uuid import uuid4, UUID
from sqlalchemy import create_engine
from
sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
# 安全导入
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
# 自定义导入
from src.models import User, Game, GameModel, UserModel, UserGameModel, UserGame, GameSimilarity, GameSimilarityModel, UserRecommendation, UserRecommendationModel
from src.similarity_pipeline import UserRecommendationService
# 从环境变量或.env文件加载数据库连接字符串
DATABASE_URL = os.environ.get(“Internal_Database_Url”)
# 创建到数据库的连接
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 创建数据库表(如果不存在)
Base.metadata.create_all(bind=engine)
# 获取数据库会话的依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 初始化FastAPI应用
app = FastAPI(title=”Game Store API”, version=”1.0.0″)
# 添加CORS中间件以允许请求
origins = [“http://localhost:8000”]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=[“*”],
allow_headers=[“*”],
)
#
————————————————-#
# ———-PART 1: GET
METHODS——————-#
#
————————————————-#
@app.get(“/”)
async def root():
return {“message”: “Hello World”}
@app.get(“/api/v1/games/”)
async def fetch_products(appid: str = None, db: Session = Depends(get_db)):
# 使用SQLAlchemy Game模型查询数据库
if appid:
products = db.query(Game).filter(Game.appid == appid).all()
else:
products = db.query(Game).all()
return [GameModel.from_orm(product) for product in products]
“`
一旦我们定义了main.py文件,我们就可以在基础项目目录中使用以下命令最终运行它。
uvicorn src.main:app –reload –> uvicorn src.main:app –reload
uvicorn src.main:app –reload 的翻译结果是:uvicorn src.main:app –reload
完成后,我们可以导航到
http://127.0.0.1:8000/docs并查看下面的交互式FastAPI环境。从这个页面,我们可以测试在我们的main.py文件中定义的任何方法。对于我们的fetch_products函数,我们可以传递一个appid,并从我们的数据库中返回任何匹配的游戏。
主.py
获取产品

图 6:FastAPI Swagger 文档

构建我们的类似性管道
我们已经设置好了数据库,并可以通过FastAPI访问和更新数据;目前是时候转向这个项目的核心特性了:一个推荐系统流水线。推荐系统是一个经过深入研究的领域,我们在这里没有添加任何创新;不过,这将提供一个清晰的示例,展示如何使用FastAPI实现一个基本的推荐系统。
开始 — 如何推荐产品?
如果我们思考问题“我如何推荐用户会喜爱的新产品?”,有两种方法是直观合理的。
17. 协同过滤推荐系统:如果我有一系列用户和一系列产品,我可以通过查看他们的整体产品篮子来识别具有类似兴趣的用户,然后确定给定用户的产品篮子中“缺失”的产品。例如,如果我有用户1-3和产品A-C,用户1-2喜爱所有三种产品,但用户3到目前为止只喜爱产品A + B,那么我可能会向他们推荐产品C。这在逻辑上是有道理的;所有三个用户在他们喜爱的产品中有很高的重叠度,但产品C缺失于用户3的篮子中,因此他们很可能也会喜爱它。通过比较类似用户生成推荐的这个过程被称为协同过滤。
18. 基于内容的推荐系统:如果我有一系列产品,我可以识别与用户喜爱的产品类似的产品,并推荐这些产品。例如,如果我为每款游戏有一系列标签,我可以将每款游戏的标签系列转换为由1和0组成的向量,然后使用类似度度量(在这种情况下是余弦类似度度量)来衡量基于它们的向量的游戏之间的类似性。一旦我完成了这个过程,我就可以根据它们的类似性分数返回用户喜爱的游戏中与之最类似的前N款游戏。
更多关于推荐系统的信息可以在这里找到。
由于我们的初始数据集没有大量用户,我们没有必要的数据来根据用户类似性推荐物品,这被称为冷启动问题。因此,我们将开发基于内容的推荐系统,由于我们有大量的游戏数据可供使用。
要构建我们的流水线,我们必须解决两个挑战:(1)我们如何计算用户的类似度分数,以及(2)如何自动化此流程,以便在用户更新他们的游戏时运行?
我们将讨论如何在用户每次通过“喜爱”游戏发出POST请求时触发类似性流水线,然后介绍如何构建流水线本身。
将推荐管道与FastAPI绑定
目前,想象我们有一个推荐服务,它将更新我们的user_recommendation表。我们希望确保每当用户更新其偏好时都会调用此服务。我们可以按照以下步骤实现这一点;第一,我们定义一个
generate_recommendations_background函数,该函数负责连接到我们的数据库,运行类似性流水线,然后关闭连接。接下来,我们需要确保当用户发出post请求(即喜爱一个新游戏)时调用此函数;为此,我们只需在create_user_gamepost请求函数的末尾添加函数调用。
用户推荐
生成推荐背景
创建用户游戏
该工作流程的结果是,每当用户向我们的user_game表发出post请求时,他们调用create_user_game函数,向数据库添加一个新的user_game对象,然后作为后台函数运行类似性管道。
用户游戏表
创建用户游戏
用户游戏
注意:下面的post方法和辅助函数与我们的其他FastAPI方法一起存储在main.py中。
# 导入类似度管道
from src.similarity_pipeline import UserRecommendationService
# 后台任务函数
def
generate_recommendations_background(username: str, database_url: str):
“””后台任务,为用户生成推荐”””
# 为后台任务创建一个新的数据库会话
background_engine = create_engine(database_url)
BackgroundSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=background_engine)
db = BackgroundSessionLocal()
try:
recommendation_service = UserRecommendationService(db, database_url)
recommendation_service.generate_recommendations_for_user(username)
finally:
db.close()
# 调用后台任务函数的POST方法
@app.post(“/api/v1/user_game/”)
async def create_user_game(user_game: UserGameModel, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
# 检查条目是否已存在
existing = db.query(UserGame).filter_by(username=user_game.username, appid=user_game.appid).first()
if existing:
raise HTTPException(status_code=400, detail=”用户已拥有此游戏。”)
# 使用默认值准备数据
user_game_data = {
“username”: user_game.username,
“appid”: user_game.appid,
“shelf”: user_game.shelf if user_game.shelf is not None else “Wish_List”,
“rating”: user_game.rating if user_game.rating is not None else 0.0,
“review”: user_game.review if user_game.review is not None else “”
}
if user_game.id is not None:
user_game_data[“id”] = UUID(str(user_game.id))
# 将用户游戏保存到数据库
db_user_game = UserGame(**user_game_data)
db.add(db_user_game)
db.commit()
db.refresh(db_user_game)
# 触发后台任务以为该用户生成推荐
background_tasks.add_task(
generate_recommendations_background, user_game.username, DATABASE_URL)
return db_user_game
# 导入类似性管道
from src.similarity_pipeline import UserRecommendationService
# 后台任务函数
def
generate_recommendations_background(username: str, database_url: str):
“””后台任务,为用户生成推荐”””
# 为后台任务创建一个新的数据库会话
background_engine = create_engine(database_url)
BackgroundSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=background_engine)
db = BackgroundSessionLocal()
try:
recommendation_service = UserRecommendationService(db, database_url)
recommendation_service.generate_recommendations_for_user(username)
finally:
db.close()
# 调用后台任务函数的 POST 方法
@app.post(“/api/v1/user_game/”)
async def create_user_game(user_game: UserGameModel, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
# 检查条目是否已存在
existing = db.query(UserGame).filter_by(username=user_game.username, appid=user_game.appid).first()
if existing:
raise HTTPException(status_code=400, detail=”用户已拥有此游戏。”)
# 使用默认值准备数据
user_game_data = {
“username”: user_game.username,
“appid”: user_game.appid,
“shelf”: user_game.shelf if user_game.shelf is not None else “Wish_List”,
“rating”: user_game.rating if user_game.rating is not None else 0.0,
“review”: user_game.review if user_game.review is not None else “”
}
if user_game.id is not None:
user_game_data[“id”] = UUID(str(user_game.id))
# 将用户游戏保存到数据库
db_user_game = UserGame(**user_game_data)
db.add(db_user_game)
db.commit()
db.refresh(db_user_game)
# 触发后台任务以为该用户生成推荐
background_tasks.add_task(
generate_recommendations_background, user_game.username, DATABASE_URL)
return db_user_game
构建推荐系统流程
目前我们了解了当用户更新他们喜爱的游戏时如何触发我们的类似度流程,是时候深入了解推荐流程的机制了。我们的推荐流程存储在similarity_pipeline.py中,包含我们展示如何导入和实例化的UserRecommendationService类。这个类包含一系列辅助函数,最终都在
generate_recommendations_for_user方法中调用。这里按顺序调用了7个基本步骤,我们将逐一介绍。
类似性管道.py
用户推荐服务
为用户生成推荐
19. 获取用户的游戏:为了生成类似的游戏推荐,我们需要检索用户已经添加到游戏篮子中的游戏。这通过调用我们的fetch_user_games辅助函数来实现。该函数使用用户的ID查询我们的user_games表,该ID作为输入进行post请求,并返回篮子中的所有游戏。
20. 获取游戏标签:为了比较游戏,我们需要一个维度来进行比较,而这个维度就是与每个游戏相关联的标签(策略游戏、桌游等)。要检索游戏:标签映射,我们调用我们的fetch_all_game_tags函数,该函数返回我们数据库中所有游戏的标签。
21. 将游戏标签向量化:为了比较游戏A和B之间的类似性,我们第一需要使用我们的create_game_vectors函数对游戏标签进行向量化。该函数按照字母顺序获取所有标签,并检查每个标签是否与给定游戏相关联。例如,如果我们的标签总集合是[boardgame, deckbuilding, resource-management],而游戏1只有与boardgame标签相关联,那么它的向量将是[1, 0, 0]。
22. 创建我们的用户向量:一旦我们有一个代表每个游戏的向量,我们就需要一个聚合用户向量来进行比较。为了实现这一点,我们使用我们的create_user_vector函数,它生成一个与我们的游戏向量一样长度的聚合向量,然后我们可以使用它来生成用户与每个其他游戏之间的类似度分数。
23. 计算类似度:我们在
calculate_user_recommendations中使用步骤3和4中创建的向量,计算余弦类似度分数,该分数范围从0到1,并测量每个游戏与我们用户聚合游戏之间的类似度。
24. 删除旧的推荐:在为用户填充新的推荐之前,我们第一要使用
delete_existing_recommendations删除用户的旧推荐。这只会删除发出请求的用户的推荐,其他用户的推荐保持不变。
25. 填充新的推荐内容:在删除旧的推荐内容后,我们使用 save_recommendations 来填充新的推荐内容。
获取用户游戏
用户游戏
获取所有游戏标签
创建游戏向量
创建用户向量
用户推荐
删除现有的推荐
保存推荐
从sqlalchemy.orm导入会话
从sqlalchemy导入创建引擎,文本
从src.models导入UserGame,UserRecommendation
从sklearn.metrics.pairwise导入余弦类似度
导入pandas作为pd
导入uuid
从typing导入列表
导入日志
# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class
UserRecommendationService:
def __init__(self, db_session: Session, database_url: str):
self.db = db_session
self.database_url = database_url
self.engine = create_engine(database_url)
def fetch_user_games(self, username: str) -> pd.DataFrame:
“””获取特定用户的所有游戏”””
query = text(“SELECT username, appid FROM user_games WHERE username = :username”)
with self.engine.connect() as conn:
result = conn.execute(query, {“username”: username})
data = result.fetchall()
return pd.DataFrame(data, columns=['username', 'appid'])
def fetch_all_category(self) -> pd.DataFrame:
“””获取所有游戏标签”””
query = text(“SELECT appid, category FROM category”)
with self.engine.connect() as conn:
result = conn.execute(query)
data = result.fetchall()
return pd.DataFrame(data, columns=['appid', 'category'])
def create_game_vectors(self, tag_df: pd.DataFrame) -> tuple[pd.DataFrame, List[str], List[str]]:
“””从标签创建游戏向量”””
unique_tags = tag_df['category'].drop_duplicates().sort_values().tolist()
unique_games = tag_df['appid'].drop_duplicates().sort_values().tolist()
game_vectors = []
for game in unique_games:
tags = tag_df[tag_df['appid'] == game]['category'].tolist()
vector = [1 if tag in tags else 0 for tag in unique_tags]
game_vectors.append(vector)
return pd.DataFrame(game_vectors, columns=unique_tags, index=unique_games), unique_tags, unique_games
def create_user_vector(self, user_games_df: pd.DataFrame, game_vectors: pd.DataFrame, unique_tags: List[str]) -> pd.DataFrame:
“””从用户玩过的游戏创建用户向量”””
if user_games_df.empty:
return pd.DataFrame([[0] * len(unique_tags)], columns=unique_tags, index=['unknown_user'])
username = user_games_df.iloc[0]['username']
user_games = user_games_df['appid'].tolist()
# 只保留存在于game_vectors中的游戏
user_games = [g for g in user_games if g in game_vectors.index]
if not user_games:
user_vector = [0] * len(unique_tags)
else:
played_game_vectors = game_vectors.loc[user_games]
user_vector = played_game_vectors.mean(axis=0).tolist()
return pd.DataFrame([user_vector], columns=unique_tags, index=[username])
def
calculate_user_recommendations(self, user_vector: pd.DataFrame, game_vectors: pd.DataFrame, top_n: int = 20) -> pd.DataFrame:
“””计算用户向量与所有游戏向量之间的类似度”””
username = user_vector.index[0]
user_vector_data = user_vector.iloc[0].values.reshape(1, -1)
# 计算类似度
similarities = cosine_similarity(user_vector_data, game_vectors)
similarity_df = pd.DataFrame(similarities.T, index=game_vectors.index, columns=[username])
# 获取前N个推荐
top_games = similarity_df[username].nlargest(top_n)
recommendations = []
for appid, similarity in top_games.items():
recommendations.append({
“username”: username,
“appid”: appid,
“similarity”: float(similarity)
})
return pd.DataFrame(recommendations)
def
delete_existing_recommendations(self, username: str):
“””删除用户的现有推荐”””
self.db.query(UserRecommendation).filter(
UserRecommendation.username == username).delete()
self.db.commit()
def save_recommendations(self, recommendations_df: pd.DataFrame):
“””将新推荐保存到数据库”””
for _, row in
recommendations_df.iterrows():
recommendation = UserRecommendation(
id=uuid.uuid4(),
username=row['username'],
appid=row['appid'],
similarity=row['similarity']
)
self.db.add(recommendation)
self.db.commit()
def
generate_recommendations_for_user(self, username: str, top_n: int = 20):
“””为特定用户生成推荐的主要方法”””
try:
logger.info(f”开始为用户生成推荐: {username}”)
# 1. 获取用户的游戏
user_games_df = self.fetch_user_games(username)
if user_games_df.empty:
logger.warning(f”未找到用户的游戏:{username}”)
return
# 2. 获取所有游戏标签
tag_df = self.fetch_all_category()
if tag_df.empty:
logger.error(“数据库中未找到游戏标签”)
return
# 3. 创建游戏向量
game_vectors, unique_tags, unique_games = self.create_game_vectors(tag_df)
# 4. 创建用户向量
user_vector = self.create_user_vector(user_games_df, game_vectors, unique_tags)
# 5. 计算推荐
recommendations_df =
self.calculate_user_recommendations(user_vector, game_vectors, top_n)
# 6. 删除现有推荐
self.delete_existing_recommendations(username)
# 7. 保存新推荐
self.save_recommendations(recommendations_df)
logger.info(f”成功为用户生成{len(recommendations_df)}条推荐:{username}”)
except Exception as e:
logger.error(f”为用户{username}生成推荐时出错:{str(e)}”)
self.db.rollback()
raise
从sqlalchemy.orm导入会话
从sqlalchemy导入create_engine,文本
从src.models导入UserGame,UserRecommendation
从sklearn.metrics.pairwise导入cosine_similarity
导入pandas作为pd
导入uuid
从typing导入列表
导入日志
# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class
UserRecommendationService:
def __init__(self, db_session: Session, database_url: str):
self.db = db_session
self.database_url = database_url
self.engine = create_engine(database_url)
def fetch_user_games(self, username: str) -> pd.DataFrame:
“””获取特定用户的所有游戏”””
query = text(“SELECT username, appid FROM user_games WHERE username = :username”)
with self.engine.connect() as conn:
result = conn.execute(query, {“username”: username})
data = result.fetchall()
return pd.DataFrame(data, columns=['username', 'appid'])
def fetch_all_category(self) -> pd.DataFrame:
“””获取所有游戏标签”””
query = text(“SELECT appid, category FROM category”)
with self.engine.connect() as conn:
result = conn.execute(query)
data = result.fetchall()
return pd.DataFrame(data, columns=['appid', 'category'])
def create_game_vectors(self, tag_df: pd.DataFrame) -> tuple[pd.DataFrame, List[str], List[str]]:
“””从标签创建游戏向量”””
unique_tags = tag_df['category'].drop_duplicates().sort_values().tolist()
unique_games = tag_df['appid'].drop_duplicates().sort_values().tolist()
game_vectors = []
for game in unique_games:
tags = tag_df[tag_df['appid'] == game]['category'].tolist()
vector = [1 if tag in tags else 0 for tag in unique_tags]
game_vectors.append(vector)
return pd.DataFrame(game_vectors, columns=unique_tags, index=unique_games), unique_tags, unique_games
def create_user_vector(self, user_games_df: pd.DataFrame, game_vectors: pd.DataFrame, unique_tags: List[str]) -> pd.DataFrame:
“””从用户玩过的游戏创建用户向量”””
if user_games_df.empty:
return pd.DataFrame([[0] * len(unique_tags)], columns=unique_tags, index=['unknown_user'])
username = user_games_df.iloc[0]['username']
user_games = user_games_df['appid'].tolist()
# 仅保留存在于game_vectors中的游戏
user_games = [g for g in user_games if g in game_vectors.index]
if not user_games:
user_vector = [0] * len(unique_tags)
else:
played_game_vectors = game_vectors.loc[user_games]
user_vector = played_game_vectors.mean(axis=0).tolist()
return pd.DataFrame([user_vector], columns=unique_tags, index=[username])
def
calculate_user_recommendations(self, user_vector: pd.DataFrame, game_vectors: pd.DataFrame, top_n: int = 20) -> pd.DataFrame:
“””计算用户向量与所有游戏向量之间的类似度”””
username = user_vector.index[0]
user_vector_data = user_vector.iloc[0].values.reshape(1, -1)
# 计算类似度
similarities = cosine_similarity(user_vector_data, game_vectors)
similarity_df = pd.DataFrame(similarities.T, index=game_vectors.index, columns=[username])
# 获取前N个推荐
top_games = similarity_df[username].nlargest(top_n)
recommendations = []
for appid, similarity in top_games.items():
recommendations.append({
“username”: username,
“appid”: appid,
“similarity”: float(similarity)
})
return pd.DataFrame(recommendations)
def
delete_existing_recommendations(self, username: str):
“””删除用户的现有推荐”””
self.db.query(UserRecommendation).filter(
UserRecommendation.username == username).delete()
self.db.commit()
def save_recommendations(self, recommendations_df: pd.DataFrame):
“””将新推荐保存到数据库”””
for _, row in
recommendations_df.iterrows():
recommendation = UserRecommendation(
id=uuid.uuid4(),
username=row['username'],
appid=row['appid'],
similarity=row['similarity']
)
self.db.add(recommendation)
self.db.commit()
def
generate_recommendations_for_user(self, username: str, top_n: int = 20):
“””为特定用户生成推荐的主要方法”””
try:
logger.info(f”为用户生成推荐: {username}”)
# 1. 获取用户的游戏
user_games_df = self.fetch_user_games(username)
if user_games_df.empty:
logger.warning(f”未找到用户的游戏:{username}”)
return
# 2. 获取所有游戏标签
tag_df = self.fetch_all_category()
if tag_df.empty:
logger.error(“数据库中未找到游戏标签”)
return
# 3. 创建游戏向量
game_vectors, unique_tags, unique_games = self.create_game_vectors(tag_df)
# 4. 创建用户向量
user_vector = self.create_user_vector(user_games_df, game_vectors, unique_tags)
# 5. 计算推荐
recommendations_df =
self.calculate_user_recommendations(user_vector, game_vectors, top_n)
# 6. 删除现有推荐
self.delete_existing_recommendations(username)
# 7. 保存新推荐
self.save_recommendations(recommendations_df)
logger.info(f”成功为用户生成 {len(recommendations_df)} 条推荐:{username}”)
except Exception as e:
logger.error(f”为用户 {username} 生成推荐时出错:{str(e)}”)
self.db.rollback()
raise
总结
在本文中,我们介绍了如何设置 PostgreSQL 数据库和 FastAPI 应用程序来运行游戏推荐系统。不过,我们还没有讨论如何将这个系统部署到云服务上,以便让其他人与之交互。关于如何做到这一点的第二部分,请继续阅读第二部分。
