Files
sign-doc/python/api_signer.py

459 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API签名工具
提供API请求签名与验证功能支持多种签名算法。
"""
import hashlib
import hmac
import re
import time
import urllib.parse
from enum import Enum, auto
from typing import Dict, List, Optional, Tuple, Union
class SignatureAlgorithm(Enum):
"""签名算法类型"""
MD5 = auto()
SHA1 = auto()
SHA256 = auto()
HMAC_SHA256 = auto()
@staticmethod
def from_str(algorithm_str: str) -> 'SignatureAlgorithm':
"""从字符串解析算法类型"""
algorithm_str = algorithm_str.upper()
if algorithm_str == 'MD5':
return SignatureAlgorithm.MD5
elif algorithm_str == 'SHA1':
return SignatureAlgorithm.SHA1
elif algorithm_str == 'SHA256':
return SignatureAlgorithm.SHA256
elif algorithm_str in ('HMAC_SHA256', 'HMACSHA256', 'HMAC-SHA256'):
return SignatureAlgorithm.HMAC_SHA256
else:
raise ValueError(f"无效的签名算法: {algorithm_str}")
def __str__(self) -> str:
if self == SignatureAlgorithm.MD5:
return "MD5"
elif self == SignatureAlgorithm.SHA1:
return "SHA1"
elif self == SignatureAlgorithm.SHA256:
return "SHA256"
elif self == SignatureAlgorithm.HMAC_SHA256:
return "HMAC-SHA256"
return "UNKNOWN"
class SignOptions:
"""签名选项"""
def __init__(
self,
algorithm: SignatureAlgorithm = SignatureAlgorithm.MD5,
key_name: str = "AccessKeyId",
channel_id_name: str = "channelId",
timestamp_name: str = "timestamp",
nonce_name: str = "nonce",
signature_name: str = "sign"
):
"""
初始化签名选项
Args:
algorithm: 签名算法
key_name: AccessKeyId参数名
channel_id_name: 合作渠道方ID参数名
timestamp_name: 时间戳参数名
nonce_name: 随机字符串参数名
signature_name: 签名参数名
"""
self.algorithm = algorithm
self.key_name = key_name
self.channel_id_name = channel_id_name
self.timestamp_name = timestamp_name
self.nonce_name = nonce_name
self.signature_name = signature_name
class ApiSigner:
"""API签名工具"""
def __init__(self, options: Optional[SignOptions] = None):
"""
初始化签名工具
Args:
options: 签名选项如为None则使用默认选项
"""
self.options = options if options is not None else SignOptions()
def generate_nonce(self) -> str:
"""
生成随机字符串
Returns:
一个基于当前时间的随机字符串
"""
return f"{int(time.time() * 1000)}{int(time.time()) % 1000}"
def get_timestamp(self) -> int:
"""
获取当前时间戳(毫秒)
Returns:
当前的Unix时间戳毫秒
"""
return int(time.time() * 1000)
def sign_request(
self,
params: Dict[str, str],
access_key_id: str,
secret_key: str,
channel_id: str
) -> Dict[str, str]:
"""
对请求进行签名
Args:
params: 请求参数
access_key_id: 访问密钥ID
secret_key: 密钥
channel_id: 合作渠道方ID
Returns:
添加了签名的完整参数
"""
# 创建用于签名的参数副本
signed_params = params.copy()
# 添加认证参数
timestamp = self.get_timestamp()
signed_params[self.options.key_name] = access_key_id
signed_params[self.options.channel_id_name] = channel_id
signed_params[self.options.timestamp_name] = str(timestamp)
signed_params[self.options.nonce_name] = self.generate_nonce()
# 计算签名
signature = self.calculate_signature(signed_params, secret_key)
# 添加签名到参数
signed_params[self.options.signature_name] = signature
return signed_params
def sign_url(
self,
base_url: str,
params: Dict[str, str],
access_key_id: str,
secret_key: str,
channel_id: str
) -> str:
"""
对URL进行签名
Args:
base_url: 基础URL地址
params: 请求参数
access_key_id: 访问密钥ID
secret_key: 密钥
channel_id: 合作渠道方ID
Returns:
添加了签名的完整URL
"""
# 对请求签名
signed_params = self.sign_request(params, access_key_id, secret_key, channel_id)
# 构建URL查询字符串
query_string = urllib.parse.urlencode(signed_params)
# 处理已有参数的URL
if '?' in base_url:
return f"{base_url}&{query_string}"
else:
return f"{base_url}?{query_string}"
def calculate_signature(self, params: Dict[str, str], secret_key: str) -> str:
"""
计算签名
Args:
params: 请求参数
secret_key: 密钥
Returns:
签名字符串
"""
# 按键名排序并生成规范化的请求字符串
signing_string = self.create_signing_string(params)
# 添加密钥
signing_string = f"{signing_string}&key={secret_key}"
# 使用指定算法计算签名
if self.options.algorithm == SignatureAlgorithm.MD5:
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.SHA1:
return hashlib.sha1(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.SHA256:
return hashlib.sha256(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.HMAC_SHA256:
return hmac.new(
secret_key.encode('utf-8'),
signing_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
else:
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
def create_signing_string(self, params: Dict[str, str]) -> str:
"""
创建用于签名的规范化字符串
Args:
params: 请求参数
Returns:
按键名排序并拼接的字符串
"""
# 排除签名参数
sorted_params = {k: v for k, v in params.items() if k != self.options.signature_name}
# 获取所有键并排序
keys = sorted(sorted_params.keys())
# 构建签名字符串
parts = []
for key in keys:
value = sorted_params[key]
# 仅对非字母数字字符进行URL编码
if self.needs_url_encode(value):
value = urllib.parse.quote(value, safe='')
parts.append(f"{key}={value}")
return '&'.join(parts)
def needs_url_encode(self, s: str) -> bool:
"""
判断是否需要对字符串进行URL编码
Args:
s: 需要判断的字符串
Returns:
如果包含非字母数字字符返回True否则返回False
"""
return bool(re.search(r'[^a-zA-Z0-9]', s))
def verify_signature(
self,
params: Dict[str, str],
secret_key: str,
max_age_ms: int = 300000
) -> Tuple[bool, Optional[str]]:
"""
验证签名
Args:
params: 所有请求参数,包括签名
secret_key: 密钥
max_age_ms: 允许的最大时间差(毫秒)
Returns:
验证结果和错误信息,成功为(True, None),失败为(False, 错误信息)
"""
# 获取并验证必要参数
if self.options.key_name not in params:
return False, f"缺少参数: {self.options.key_name}"
if self.options.channel_id_name not in params:
return False, f"缺少参数: {self.options.channel_id_name}"
if self.options.timestamp_name not in params:
return False, f"缺少参数: {self.options.timestamp_name}"
try:
timestamp = int(params[self.options.timestamp_name])
except ValueError:
return False, "无效的时间戳"
now = self.get_timestamp()
if abs(now - timestamp) > max_age_ms:
return False, "时间戳过期"
if self.options.nonce_name not in params:
return False, f"缺少参数: {self.options.nonce_name}"
# 这里可以插入nonce验证逻辑防止重放攻击
# 服务端需要维护一个时效性的nonce存储
if self.options.signature_name not in params:
return False, f"缺少参数: {self.options.signature_name}"
provided_signature = params[self.options.signature_name]
# 计算签名并比较
expected_signature = self.calculate_signature(params, secret_key)
if expected_signature == provided_signature:
return True, None
else:
return False, "签名不匹配"
# 用于命令行直接调用的简单示例
if __name__ == "__main__":
import argparse
import json
import os
from urllib.parse import parse_qsl
# # 加载环境变量
# dotenv.load_dotenv()
def parse_args():
parser = argparse.ArgumentParser(description='API签名工具')
parser.add_argument(
'-a', '--algorithm',
default=os.environ.get('SIGN_ALGORITHM', 'MD5'),
help='签名算法: MD5, SHA1, SHA256, HMAC_SHA256'
)
parser.add_argument(
'-k', '--access-key-id',
default=os.environ.get('ACCESS_KEY_ID', 'test-access-key-id'),
help='访问密钥ID'
)
parser.add_argument(
'-c', '--channel-id',
default=os.environ.get('CHANNEL_ID', 'test-channel-id'),
help='合作渠道方ID'
)
parser.add_argument(
'-s', '--secret-key',
default=os.environ.get('SECRET_KEY', 'test-secret-key'),
help='密钥'
)
parser.add_argument(
'-u', '--url',
default=os.environ.get('API_BASE_URL', 'https://api.example.com/v1/data'),
help='基础URL地址'
)
parser.add_argument(
'-p', '--param',
action='append',
help='请求参数格式为key=value可多次指定'
)
parser.add_argument(
'-j', '--json',
help='请求参数JSON格式'
)
parser.add_argument(
'-m', '--mode',
choices=['url', 'params', 'verify'],
default='url',
help='操作模式: url - 签名URL, params - 签名参数, verify - 验证签名'
)
return parser.parse_args()
def main():
args = parse_args()
# 解析算法
try:
algorithm = SignatureAlgorithm.from_str(args.algorithm)
except ValueError as e:
print(f"警告: {e}使用默认MD5算法")
algorithm = SignatureAlgorithm.MD5
# 创建签名选项
options = SignOptions(algorithm=algorithm)
# 创建签名工具
signer = ApiSigner(options)
# 解析参数
params = {}
if args.param:
for param in args.param:
if '=' in param:
key, value = param.split('=', 1)
params[key] = value
if args.json:
try:
json_params = json.loads(args.json)
if isinstance(json_params, dict):
params.update(json_params)
except json.JSONDecodeError:
print("警告: 无效的JSON参数")
# 如果没有参数,使用默认参数进行演示
if not params:
params = {
'userId': '12345',
'action': 'getData',
'data': '测试数据', # 包含非ASCII字符测试URL编码
}
print("===================== API签名示例 =====================")
print(f"AccessKeyId: {args.access_key_id}")
print(f"ChannelId: {args.channel_id}")
print(f"SecretKey: {args.secret_key}")
print(f"签名算法: {algorithm}")
print(f"基础URL: {args.url}")
print("请求参数:", params)
if args.mode == 'url':
# 签名URL
signed_url = signer.sign_url(args.url, params, args.access_key_id, args.secret_key, args.channel_id)
print("\n签名后的URL:")
print(signed_url)
elif args.mode == 'params':
# 获取签名后的参数
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
print("\n签名后的参数:")
for key, value in signed_params.items():
print(f" {key}: {value}")
elif args.mode == 'verify':
# 如果参数中已经包含签名,直接验证
if options.signature_name in params:
valid, error = signer.verify_signature(params, args.secret_key)
print("\n签名验证结果:")
if valid:
print(" 验证成功")
else:
print(f" 验证失败: {error}")
else:
# 先签名再验证(演示用)
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
valid, error = signer.verify_signature(signed_params, args.secret_key)
print("\n签名验证结果:")
if valid:
print(" 验证成功")
else:
print(f" 验证失败: {error}")
# 演示不同算法的签名结果
print("\n不同算法的签名结果:")
for alg in SignatureAlgorithm:
temp_options = SignOptions(algorithm=alg)
temp_signer = ApiSigner(temp_options)
# 添加必要的参数用于签名
sign_params = params.copy()
sign_params[options.key_name] = args.access_key_id
sign_params[options.channel_id_name] = args.channel_id
signature = temp_signer.calculate_signature(sign_params, args.secret_key)
print(f" {alg}: {signature}")
main()