利用gRPC构建Python微服务(三)——实战Python gRPC
bigegpt 2024-11-24 12:00 4 浏览
在上一篇中,主要向大家介绍了gRPC的一些基础概念,以及基本的定义方式,本篇将进入实战教程,如何用Python实现服务端和客户端,通过gRPC进行交互。
全文导航
为了方便大家阅读,这里将全部目录进行一下索引,方便大家在老孙正经胡说(https://sunqi.site)中查看相关文章:
- 利用gRPC构建Python微服务(一)——关于微服务(https://sunqi.site/posts/python-microservices-grpc-1/)
- 利用gRPC构建Python微服务(二)——gRPC基础(https://sunqi.site/posts/python-microservices-grpc-2/)
- 利用gRPC构建Python微服务(三)——实战Python gRPC(https://sunqi.site/posts/python-microservices-grpc-3/)
- 利用gRPC构建Python微服务(四)——在Kubernetes中部署(https://sunqi.site/posts/python-microservices-grpc-4/)
- 利用gRPC构建Python微服务(五)——微服务可观测性(https://sunqi.site/posts/python-microservices-grpc-5/)
- 利用gRPC构建Python微服务(六)——Python gRPC最佳实践(https://sunqi.site/posts/python-microservices-grpc-6/)
- 利用gRPC构建Python微服务(七)——AsyncIO和gRPC(https://sunqi.site/posts/python-microservices-grpc-7/)
代码样例
现在正式进入实现部分,我们来看看protoco buffers能做什么?protobufs是简称,后续会大量出现。
正如上面提到的,你可以从protobufs生成Python代码。这个工作是作为grpcio-tools包的一部分。
首先,定义你的初始目录结构:
.
├── protobufs/
│ └── recommendations.proto
|
└── recommendations/
protobufs包含recommendations.proto文件,这是我们上面定义的部分。
你将在recommendations目录生成Python代码。首先,你需要安装grpcio-tools,创建recommendations/requirements.txt
grpcio-tools ~= 1.30
可以创建一个virtualenv环境来运行后续代码,在虚拟环境中安装依赖。
virtualenv venv
source venv/bin/activate
python -m pip install -r requirements.txt
从protobufs生成代码
$ cd recommendations
$ python -m grpc_tools.protoc -I ../protobufs --python_out=. \
--grpc_python_out=. ../protobufs/recommendations.proto
此处将生成两个文件
$ ls
recommendations_pb2.py recommendations_pb2_grpc.py
这些文件包含与API通讯的Python类型和函数。编译器生成客户端代码并调用RPC和Server端代码实现远程调用。我们先来看一下客户端代码部分。
RPC客户端
这里生成的代码可读性并不高
>>> from recommendations_pb2 import BookCategory, RecommendationRequest
>>> request = RecommendationRequest(
... user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
... )
>>> request.category
1
protobuf编译器生成了与你的protobuf类型对应的 Python 类型。到目前为止,一切顺利。你还可以看到一些类型的检查字段:
>>> request = RecommendationRequest(
... user_id="oops", category=BookCategory.SCIENCE_FICTION, max_results=3
... )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'oops' has type str, but expected one of: int, long
如果传入错误字段类型,则抛出TypeError。
在proto3中所有字段是可选的,所以你要校验是否正确设置所有值。如果其中一个未提供,如果为数值类型则默认为0,如果是字符类型默认为空:
>>> request = RecommendationRequest(
... user_id=1, category=BookCategory.SCIENCE_FICTION
... )
>>> request.max_results
0
因为没有设置int字段的默认值,所以返回0。
虽然protobufs为你进行了初步检查,但是仍然需要从自身业务角度验证数据的有效性,其实无论使用何种实现方式,这都是必须要做的。
生成的recommendations_pb2.py文件包含了类型定义,而recommendations_pb2_grpc.py文件则包含了客户端和服务端基本通讯框架。以下就是client在使用时需要的引入:
>>> import grpc
>>> from recommendations_pb2_grpc import RecommendationsStub
引入grpc是为了设置与远程服务的链接。之后导入RPC客户端stub,之所以叫做stub是因为客户端本身并不包含任何功能。只是调用远程服务并传回结果。
如果你查看protobuf的定义,你会看到在service Recommendations的定义最后。protobuf的编译器使用Recommendations,并且追加从客户端的名称Stub,就构成了RecommendationStub
service Recommendations {
rpc Recommend (RecommendationRequest) returns (RecommendationResponse);
}
现在你可以发送RPC请求了
>>> channel = grpc.insecure_channel("localhost:50051")
>>> client = RecommendationsStub(channel)
>>> request = RecommendationRequest(
... user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3
... )
>>> client.Recommend(request)
Traceback (most recent call last):
grpc._chanel._inactivRpcError: <_InactiveRpcError...
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
...
在代码示例中,你尝试使用非授权和非加密的方式连接本地的50051端口,这是grpc的标准端口。你可以将channel传给你的stub并实例化你的client。
现在可以调用在你微服务Recommendations中定义的Recommend方法。在protobuf中25行定义的内容:rpc Recommend (…) returns (…)。这就是Recommend方法所在。程序运行异常是由于本地服务中并没有真正运行50051的微服务,后续会实现。
下面来看一下服务端实现。
RPC服务端
打开两个终端,一个用于调试客户端,一个用于调试服务端。从基本导入和数据开始:
# recommendations/recommendations.py
from concurrent import futures
import random
import grpc
from recommendations_pb2 import (
BookCategory,
BookRecommendation,
RecommendationResponse,
)
import recommendations_pb2_grpc
books_by_category = {
BookCategory.MYSTERY: [
BookRecommendation(id=1, title="The Maltese Falcon"),
BookRecommendation(id=2, title="Murder on the Orient Express"),
BookRecommendation(id=3, title="The Hound of the Baskervilles"),
],
BookCategory.SCIENCE_FICTION: [
BookRecommendation(
id=4, title="The Hitchhiker's Guide to the Galaxy"
),
BookRecommendation(id=5, title="Ender's Game"),
BookRecommendation(id=6, title="The Dune Chronicles"),
],
BookCategory.SELF_HELP: [
BookRecommendation(
id=7, title="The 7 Habits of Highly Effective People"
),
BookRecommendation(
id=8, title="How to Win Friends and Influence People"
),
BookRecommendation(id=9, title="Man's Search for Meaning"),
],
}
示例中导入了grpc的依赖并创建了一些示例数据,下面是详细的解释:
- 第二行:导入futures是因为gRPC运行需要线程池
- 第三行:导入random是为了随机生成一些推荐的图书
- 第十四行:创建books_by_categry字典,在实际的微服务项目中,这些是存在数据库中
下一步,需要创建一个类实现微服务的方法
class RecommendationService(
recommendations_pb2_grpc.RecommendationsServicer
):
def Recommend(self, request, context):
if request.category not in books_by_category:
context.abort(grpc.StatusCode.NOT_FOUND, "Category not found")
books_for_category = books_by_category[request.category]
num_results = min(request.max_results, len(books_for_category))
books_to_recommend = random.sample(
books_for_category, num_results
)
return RecommendationResponse(recommendations=books_to_recommend)
创建一个类实现Recommend RPC,以下是详细解释:
- 第29行,定义了RecommendationService类。这是你微服务的实现。注意子类RecommendationsServicer是实现gRPC必须要做的。
- 第32行,定义了Recommend()方法,这个名字必须和protobuf文件中定义的一样。方法中接受RecommendationRequest并且返回RecommendationResponse,和protobuf定义内容一致。content参数可以用于定义返回的状态值。
- 第33到34行,如果你得到一个未知的种类,则使用abort()方法结束请求,并将状态码设置为NOT_FOUND。由于gRPC是基于HTTP/2创建的,状态码类似标准的HTTP状态码。这样可以针对接受进行不同的处理。同时允许中间件,类似监控系统,记录有多少请求包含错误。
- 第36到40行,随机挑选一些指定种类的书。使用max_results约束最大推荐数量。使用min控制超出的范围,否则random.sample将抛出异常。
- 第38行,返回RecommendationResponses对象,包含推荐的图书。
另外一点建议,最好用raise exception方式处理异常,而不是像例子中的abort(),但是返回的状态码可能不能被正确设置,后续章节会介绍如何规避这个问题。
RecommendationService定义了微服务实现,你还是需要运行。示例如下:
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
recommendations_pb2_grpc.add_RecommendationsServicer_to_server(
RecommendationService(), server
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
serve()启动了一个网络服务,用于微服务处理请求:
- 第42行,创建一个gRPC服务,使用10个现成提供服务,对于这个例子有点太多了,但是对于实际上的服务,这是一个比较推荐的默认值
- 第43行,将类关联的服务中,即服务的响应
- 第46行,将服务监听50051端口,这是gRPC的默认短开口,但是可以替换成你需要的
- 第47和48行,server.start()和server.wait_for_termination()用于启动服务并等待直到停止。唯一暂停服务的方法是在中断中使用CTRL + C。但是在生产环境中,还是需要更好的方法停止,后面会介绍到。
运行测试
终端中运行:
$ python recommendations.py
如果你的终端仍然打开,可以在console中输入:
>>> import grpc
>>> from recommendations_pb2_grpc import RecommendationsStub
>>> channel = grpc.insecure_channel("localhost:50051")
>>> client = RecommendationsStub(channel)
发送一个请求:
>>> request = RecommendationRequest(
... user_id=1, category=BookCategory.SCIENCE_FICTION, max_results=3)
>>> client.Recommend(request)
recommendations {
id: 6
title: "The Dune Chronicles"
}
recommendations {
id: 4
title: "The Hitchhiker\'s Guide To The Galaxy"
}
recommendations {
id: 5
title: "Ender\'s Game"
}
成功了,你发送了一个RPC请求到你的微服务中,并得到一个请求。每次返回结果不同是因为随机选择。
现在你的服务端已经实现,可以尝试实现Marketplace微服务,并让他调用Recommendations微服务。保留Recommendations微服务运行,其他窗口可以关闭。
实现Marketplace
创建新的marketplace目录,并添加marketplace.py文件,现在目录结构为:
.
├── marketplace/
│ ├── marketplace.py
│ ├── requirements.txt
│ └── templates/
│ └── homepage.html
|
├── protobufs/
│ └── recommendations.proto
|
└── recommendations/
├── recommendations.py
├── recommendations_pb2.py
├── recommendations_pb2_grpc.py
└── requirements.txt
marketplace下包含多个文件,可以先创建空文件,后续逐步实现。
现在可以开始实现微服务代码了。Marketplace微服务是一个Flask应用,为用户提供页面。他调用Recommendations为辅获取图书推荐并在页面展现。
在marketplace/marketplace.py中添加如下内容:
# marketplace/marketplace.py
import os
from flask import Flask, render_template
import grpc
from recommendations_pb2 import BookCategory, RecommendationRequest
from recommendations_pb2_grpc import RecommendationsStub
app = Flask(__name__)
recommendations_host = os.getenv("RECOMMENDATIONS_HOST", "localhost")
recommendations_channel = grpc.insecure_channel(
f"{recommendations_host}:50051"
)
recommendations_client = RecommendationsStub(recommendations_channel)
@app.route("/")
def render_homepage():
recommendations_request = RecommendationRequest(
user_id=1, category=BookCategory.MYSTERY, max_results=3
)
recommendations_response = recommendations_client.Recommend(
recommendations_request
)
return render_template(
"homepage.html",
recommendations=recommendations_response.recommendations,
)
这里创建了一个Flask应用和gRPC客户端,增加了一个方法渲染主页。代码详情:
- 第10行,创建Flask应用并为用户渲染网页
- 第12到16行,创建gRPC channel和stub
- 第20到30行,用户访问时,调用render_homepage()。通过模板返回HTML页面
在这个示例中,你创建了一个全局性的gRPC channel和stub。通常来说,global是不推荐使用的,但是这个示例是个例外。 gRPC channel保持一个持久性连接,避免多次重建连接。满足并发性需要,连接断掉后自动重建。但是,如果你每次是在请求时建立新的channel,然后Python再回收,无法获得长连接的最大优势。 你需要保持channel打开,并且不需要每次重新连接recommendations微服务。你能将channel隐藏在其他模块中,但是本地中只有一个文件,所以为了简单,使用了全局定义。
打开marketplace/tempaltes/homepage.html文件,添加如下内容:
<!-- homepage.html -->
<!doctype html>
<html lang="en">
<head>
<title>Online Books For You</title>
</head>
<body>
<h1>Mystery books you may like</h1>
<ul>
{% for book in recommendations %}
<li>{{ book.title }}</li>
{% endfor %}
</ul>
</body>
这仅是一个demo页面。全部完成后,应该显示推荐图书的清单。
如果想执行代码,你需要添加依赖,在你的marketplace/requiirements.txt中添加:
flask ~= 1.1
grpcio-tools ~= 1.30
Jinja2 ~= 2.11
pytest ~= 5.4
为marketplace生成protbufs
$ cd marketplace
$ python -m grpc_tools.protoc -I ../protobufs --python_out=. \
--grpc_python_out=. ../protobufs/recommendations.proto
运行Marketplace微服务
$ FLASK_APP=marketplace.py flask run
执行Flask app,默认端口为5000
相关推荐
- 悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)
-
新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...
- 高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源
-
凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...
- 微服务架构实战:商家管理后台与sso设计,SSO客户端设计
-
SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...
- 还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑
-
在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...
- Seata源码—6.Seata AT模式的数据源代理二
-
大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...
- 30分钟了解K8S(30分钟了解微积分)
-
微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...
- SpringBoot条件化配置(@Conditional)全面解析与实战指南
-
一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...
- 一招解决所有依赖冲突(克服依赖)
-
背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...
- 你读过Mybatis的源码?说说它用到了几种设计模式
-
学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...
- golang对接阿里云私有Bucket上传图片、授权访问图片
-
1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...
- spring中的资源的加载(spring加载原理)
-
最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...
- Android资源使用(android资源文件)
-
Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...
- 如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)
-
深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...
- @Autowired与@Resource原理知识点详解
-
springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...
- java的redis连接工具篇(java redis client)
-
在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)