API Reference⚓︎
fast_grpc
⚓︎
Classes⚓︎
FastGRPC
⚓︎
FastGRPC(*services, loop=get_event_loop(), port=50051, reflection=False, middlewares=())
Server application.
| PARAMETER | DESCRIPTION |
|---|---|
*services
|
Tuple of Fast-gRPC services.
TYPE:
|
loop
|
Async event loop for running server.
TYPE:
|
port
|
Port for listen requests.
TYPE:
|
reflection
|
Flag for enable/disable server gRPC reflection.
TYPE:
|
middlewares
|
Tuple of middlewares (interceptors).
TYPE:
|
Example
from fast_grpc import FastGRPC, FastGRPCService, grpc_method
class ExampleService(FastGRPCService):
...
app = FastGRPC(ExampleService())
app.run()
Functions⚓︎
add_service
⚓︎
add_service(service)
Add service to server.
| PARAMETER | DESCRIPTION |
|---|---|
service
|
gRPC service.
TYPE:
|
Example
app = FastGRPC()
app.add_service(ExampleService())
StatusCode
⚓︎
FastGRPCMiddleware
⚓︎
Middleware.
FastGRPCService
⚓︎
Implementation of gRPC service.
Attributes⚓︎
Functions⚓︎
get_service_name
⚓︎
get_service_name()
Metod for getting service full name from pb2.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Full name string of service from pb2. |
get_proto
classmethod
⚓︎
get_proto()
Render and return content of proto file for this gRPC service.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Protobuf file content string. |
from_proto
classmethod
⚓︎
from_proto(proto_file, grpc_path=cwd())
Create gRPC service interface from proto file.
| RETURNS | DESCRIPTION |
|---|---|
type[Self]
|
New service class, based on FastGRPCService. |
Functions⚓︎
grpc_method
⚓︎
grpc_method(function=None, /, name=None, request_model=None, response_model=None, middlewares=(), disable=False)
Decorator for setting method as gRPC.
| PARAMETER | DESCRIPTION |
|---|---|
function
|
Original request handler.
TYPE:
|
name
|
Name for gRPC method.
TYPE:
|
request_model
|
Model for describe request data.
TYPE:
|
response_model
|
Model for describe response data.
TYPE:
|
middlewares
|
Iterable of middlewares.
TYPE:
|
disable
|
Flag for enable/disable gRPC method.
TYPE:
|
Example
from fast_grpc import FastGRPCService, grpc_method
from pydantic import BaseModel
class ExampleService(FastGRPCService):
@grpc_method
async def ping(self, request: BaseModel) -> BaseModel:
...
@grpc_method(
name="isHealth",
request_model=BaseModel,
response_model=BaseModel,
middlewares=(),
disable=False,
)
async def is_health(self, request, context):
...