Skip to main content

接口认证

API(应用程序编程接口)是一组规则,用于定义应用程序或设备如何相互连接和通信。

最常用的是 REST API,它是一种使用 HTTP 请求访问和使用数据的技术。

GraphQL 是一种新兴的 API 技术,它提供了一种更高效、强大和灵活的选择。

REST

REST API 是符合 REST(表述性状态转移)架构样式设计原则的 API。 因此,REST API 有时被称为 RESTful API。

EST API 几乎可以使用任何编程语言进行开发,并支持多种数据格式。 唯一的要求是它们要符合以下六个 REST 设计原则 - 也称为架构约束:

  • 统一接口。 无论请求来自何处,对同一资源发出的所有 API 请求都应该看起来相同。 REST API 应确保同一条数据(例如用户的姓名或电子邮件地址)仅属于一个统一资源标识符 (URI)。 资源不应过大,但应包含客户可能需要的每一条信息。
  • 客户端/服务器解耦。 在 REST API 设计中,客户端和服务器应用程序必须彼此完全独立。 客户端应用程序只需知道所请求资源的 URI 即可; 它不能以任何其他方式与服务器应用程序交互。 同样,除了通过 HTTP 将客户端应用程序传递到所请求的数据外,服务器应用程序不应修改客户端应用程序。
  • 无状态。 REST API 是无状态的,这意味着每个请求都需要包含处理它所需的全部信息。 换句话说,REST API 不需要任何服务器端会话。 不允许服务器应用程序存储与客户端请求相关的任何数据。
  • 可缓存性。 如果可能,资源应该可以在客户端或服务器端缓存。 服务器响应还需要包含有关是否允许对交付的资源进行缓存的信息。 目标是提高客户端的性能,同时增强服务器端的可扩展性。
  • 分层系统架构。 在 REST API 中,调用和响应都会经过多个不同的层。 根据经验,请不要将客户端和服务器应用程序直接相互连接。 通信环路中可能包含多个不同的中介服务器。 需要设计 REST API,让客户端和服务器都无法判断它是与最终应用程序还是中介服务器进行通信。
  • 按需编码(可选)。 REST API 通常发送静态资源,但在某些情况下,响应也可以包含可执行代码(例如 Java 小程序)。 在这些情况下,代码只应按需运行。

REST API 通过 HTTP 请求进行通信,以便执行标准数据库功能,例如在资源中创建、读取、更新和删除记录(这些操作也称为 CRUD)。 例如,REST API 可使用 GET 请求检索记录、使用 POST 请求创建记录、使用 PUT 请求更新记录以及使用 DELETE 请求删除记录。 可以在 API 调用中使用所有 HTTP 方法。 精心设计的 REST API 类似于在具有内置 HTTP 功能的 Web 浏览器中运行的 Web 站点。 任何特定时刻或时间戳的资源状态称为资源表示。 几乎可以采用任何格式将该信息传递到客户端,包括 JavaScript 对象表示法 (JSON)、HTML、XLT、Python、PHP 或纯文本。 JSON 非常受欢迎,因为它可以被人类和机器读取,而且与编程语言无关。 在 REST API 调用中,请求头和参数也很重要,因为它们包含重要的标识信息,例如元数据、授权、统一资源标识符 (URI)、缓存、cookie 等。 在精心设计的 REST API 中会使用请求头、响应头和常规 HTTP 状态码。

怎么使用 REST风格的API?

from fastapi import FastAPI
import uvicorn
import random
from fastapi.responses import HTMLResponse

app = FastAPI()

# 返回一个1-100的随机数
@app.get("/random")
def get_random_num():
num = random.randint(1, 100)
return {"random": num}

# 返回一个html页面
@app.get("/all", response_class=HTMLResponse)
def get_all():
html = """<html></html>"""
return html
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=6022)

FastAPI示例

FastAPI 除了可以传输文本数据,还可传输图片、视频等数据,cv2 可以捕获屏幕,将两者结合起来,实现内网直播功能,可以在局域网内通过浏览器观看屏幕共享。

import os
from importlib import import_module
from flask import Flask, render_template, Response,render_template_string


from io import BytesIO
import cv2
from PIL import ImageGrab, Image
import time
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident


class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()

BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()

while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()

BaseCamera.event.wait()
BaseCamera.event.clear()

return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError('Must be implemented by subclasses.')

@classmethod
def _thread(cls):
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None



class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Error')

while True:
image = ImageGrab.grab() # 获取屏幕数据
# w, h = image.size
image = image.resize((1366, 750), Image.ANTIALIAS) # 图片缩放
output_buffer = BytesIO() # 创建二进制对象
image.save(output_buffer, format='JPEG', quality=100) # quality提升图片分辨率
frame = output_buffer.getvalue() # 获取二进制数据
yield frame # 生成器返回一张图片的二进制数据






app = Flask(__name__)


@app.route('/')
def index():
"""
视图函数
:return:
"""
return render_template_string('''<html>

<head>
<title>屏幕共享</title>
</head>

<body>
<img src="{{ url_for('video_feed') }}">
</body>

</html>''')


def gen(camera):
"""
流媒体发生器
"""
while True:
frame = camera.get_frame()

yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')


@app.route('/video_feed')
def video_feed():
"""流媒体数据"""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
ip_host = '127.0.0.1'# 本机ip地址
ip_host2 = '0.0.0.0'# 内网ip地址
app.run(threaded=True, host=ip_host2, port=80)

Graphql

graphql 是一种用于 API 的查询语言,对你的 API 中的数据提供了一套易于理解的完整描述,使得客户端能够准确地获得它需要的数据,减少数据的冗余,系统中的所有入口点(REST、GraphQL 和 RPC)都将使用相同的验证、授权和错误处理规则进行处理。

GraphQL 同样支持 Relay, Django, SQLAlchemy, Google App Engine.

怎么使用 Graphql风格的API?

安装对应的模块

pip install graphene
pip install strawberry-graphql
pip install ariadne

在 py 文件中可以运行这个简单的示例

import graphene

class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="World"))

def resolve_hello(self, info, name):
return 'Hello ' + name

schema = graphene.Schema(query=Query)
result = schema.execute('{ hello }')
print(result.data['hello']) # "Hello World"

graphene示例

import graphene

# 定义 Person 类型
class Person(graphene.ObjectType):
id = graphene.ID()
name = graphene.String()
age = graphene.Int()

# 定义查询类型
class Query(graphene.ObjectType):
# 定义查询字段,用于获取所有人的信息
all_people = graphene.List(Person)

# 实现查询字段的解析器
def resolve_all_people(self, info):
# 在这个简单的例子中,我们返回一个包含一些硬编码数据的人员列表
return [
Person(id=1, name="John Doe", age=30),
Person(id=2, name="Jane Smith", age=25),
Person(id=3, name="Bob Johnson", age=35),
]

# 创建 schema,将 Query 类型添加到根 schema 中
schema = graphene.Schema(query=Query)

# 通过 GraphQL 查询获取结果
query_string = '{ allPeople { id name age } }'
result = schema.execute(query_string)

# 打印结果
print(result.data)
# {'allPeople': [{'id': '1', 'name': 'John Doe', 'age': 30}, {'id': '2', 'name': 'Jane Smith', 'age': 25}, {'id': '3', 'name': 'Bob Johnson', 'age': 35}]}