459 lines
15 KiB
Python
459 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
API签名工具
|
||
|
||
提供API请求签名与验证功能,支持多种签名算法。
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import re
|
||
import time
|
||
import urllib.parse
|
||
from enum import Enum, auto
|
||
from typing import Dict, List, Optional, Tuple, Union
|
||
|
||
|
||
class SignatureAlgorithm(Enum):
|
||
"""签名算法类型"""
|
||
MD5 = auto()
|
||
SHA1 = auto()
|
||
SHA256 = auto()
|
||
HMAC_SHA256 = auto()
|
||
|
||
@staticmethod
|
||
def from_str(algorithm_str: str) -> 'SignatureAlgorithm':
|
||
"""从字符串解析算法类型"""
|
||
algorithm_str = algorithm_str.upper()
|
||
if algorithm_str == 'MD5':
|
||
return SignatureAlgorithm.MD5
|
||
elif algorithm_str == 'SHA1':
|
||
return SignatureAlgorithm.SHA1
|
||
elif algorithm_str == 'SHA256':
|
||
return SignatureAlgorithm.SHA256
|
||
elif algorithm_str in ('HMAC_SHA256', 'HMACSHA256', 'HMAC-SHA256'):
|
||
return SignatureAlgorithm.HMAC_SHA256
|
||
else:
|
||
raise ValueError(f"无效的签名算法: {algorithm_str}")
|
||
|
||
def __str__(self) -> str:
|
||
if self == SignatureAlgorithm.MD5:
|
||
return "MD5"
|
||
elif self == SignatureAlgorithm.SHA1:
|
||
return "SHA1"
|
||
elif self == SignatureAlgorithm.SHA256:
|
||
return "SHA256"
|
||
elif self == SignatureAlgorithm.HMAC_SHA256:
|
||
return "HMAC-SHA256"
|
||
return "UNKNOWN"
|
||
|
||
|
||
class SignOptions:
|
||
"""签名选项"""
|
||
|
||
def __init__(
|
||
self,
|
||
algorithm: SignatureAlgorithm = SignatureAlgorithm.MD5,
|
||
key_name: str = "AccessKeyId",
|
||
channel_id_name: str = "channelId",
|
||
timestamp_name: str = "timestamp",
|
||
nonce_name: str = "nonce",
|
||
signature_name: str = "sign"
|
||
):
|
||
"""
|
||
初始化签名选项
|
||
|
||
Args:
|
||
algorithm: 签名算法
|
||
key_name: AccessKeyId参数名
|
||
channel_id_name: 合作渠道方ID参数名
|
||
timestamp_name: 时间戳参数名
|
||
nonce_name: 随机字符串参数名
|
||
signature_name: 签名参数名
|
||
"""
|
||
self.algorithm = algorithm
|
||
self.key_name = key_name
|
||
self.channel_id_name = channel_id_name
|
||
self.timestamp_name = timestamp_name
|
||
self.nonce_name = nonce_name
|
||
self.signature_name = signature_name
|
||
|
||
|
||
class ApiSigner:
|
||
"""API签名工具"""
|
||
|
||
def __init__(self, options: Optional[SignOptions] = None):
|
||
"""
|
||
初始化签名工具
|
||
|
||
Args:
|
||
options: 签名选项,如为None则使用默认选项
|
||
"""
|
||
self.options = options if options is not None else SignOptions()
|
||
|
||
def generate_nonce(self) -> str:
|
||
"""
|
||
生成随机字符串
|
||
|
||
Returns:
|
||
一个基于当前时间的随机字符串
|
||
"""
|
||
return f"{int(time.time() * 1000)}{int(time.time()) % 1000}"
|
||
|
||
def get_timestamp(self) -> int:
|
||
"""
|
||
获取当前时间戳(毫秒)
|
||
|
||
Returns:
|
||
当前的Unix时间戳(毫秒)
|
||
"""
|
||
return int(time.time() * 1000)
|
||
|
||
def sign_request(
|
||
self,
|
||
params: Dict[str, str],
|
||
access_key_id: str,
|
||
secret_key: str,
|
||
channel_id: str
|
||
) -> Dict[str, str]:
|
||
"""
|
||
对请求进行签名
|
||
|
||
Args:
|
||
params: 请求参数
|
||
access_key_id: 访问密钥ID
|
||
secret_key: 密钥
|
||
channel_id: 合作渠道方ID
|
||
|
||
Returns:
|
||
添加了签名的完整参数
|
||
"""
|
||
# 创建用于签名的参数副本
|
||
signed_params = params.copy()
|
||
|
||
# 添加认证参数
|
||
timestamp = self.get_timestamp()
|
||
signed_params[self.options.key_name] = access_key_id
|
||
signed_params[self.options.channel_id_name] = channel_id
|
||
signed_params[self.options.timestamp_name] = str(timestamp)
|
||
signed_params[self.options.nonce_name] = self.generate_nonce()
|
||
|
||
# 计算签名
|
||
signature = self.calculate_signature(signed_params, secret_key)
|
||
|
||
# 添加签名到参数
|
||
signed_params[self.options.signature_name] = signature
|
||
|
||
return signed_params
|
||
|
||
def sign_url(
|
||
self,
|
||
base_url: str,
|
||
params: Dict[str, str],
|
||
access_key_id: str,
|
||
secret_key: str,
|
||
channel_id: str
|
||
) -> str:
|
||
"""
|
||
对URL进行签名
|
||
|
||
Args:
|
||
base_url: 基础URL地址
|
||
params: 请求参数
|
||
access_key_id: 访问密钥ID
|
||
secret_key: 密钥
|
||
channel_id: 合作渠道方ID
|
||
|
||
Returns:
|
||
添加了签名的完整URL
|
||
"""
|
||
# 对请求签名
|
||
signed_params = self.sign_request(params, access_key_id, secret_key, channel_id)
|
||
|
||
# 构建URL查询字符串
|
||
query_string = urllib.parse.urlencode(signed_params)
|
||
|
||
# 处理已有参数的URL
|
||
if '?' in base_url:
|
||
return f"{base_url}&{query_string}"
|
||
else:
|
||
return f"{base_url}?{query_string}"
|
||
|
||
def calculate_signature(self, params: Dict[str, str], secret_key: str) -> str:
|
||
"""
|
||
计算签名
|
||
|
||
Args:
|
||
params: 请求参数
|
||
secret_key: 密钥
|
||
|
||
Returns:
|
||
签名字符串
|
||
"""
|
||
# 按键名排序并生成规范化的请求字符串
|
||
signing_string = self.create_signing_string(params)
|
||
|
||
# 添加密钥
|
||
signing_string = f"{signing_string}&key={secret_key}"
|
||
|
||
# 使用指定算法计算签名
|
||
if self.options.algorithm == SignatureAlgorithm.MD5:
|
||
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
|
||
elif self.options.algorithm == SignatureAlgorithm.SHA1:
|
||
return hashlib.sha1(signing_string.encode('utf-8')).hexdigest()
|
||
elif self.options.algorithm == SignatureAlgorithm.SHA256:
|
||
return hashlib.sha256(signing_string.encode('utf-8')).hexdigest()
|
||
elif self.options.algorithm == SignatureAlgorithm.HMAC_SHA256:
|
||
return hmac.new(
|
||
secret_key.encode('utf-8'),
|
||
signing_string.encode('utf-8'),
|
||
hashlib.sha256
|
||
).hexdigest()
|
||
else:
|
||
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
|
||
|
||
def create_signing_string(self, params: Dict[str, str]) -> str:
|
||
"""
|
||
创建用于签名的规范化字符串
|
||
|
||
Args:
|
||
params: 请求参数
|
||
|
||
Returns:
|
||
按键名排序并拼接的字符串
|
||
"""
|
||
# 排除签名参数
|
||
sorted_params = {k: v for k, v in params.items() if k != self.options.signature_name}
|
||
|
||
# 获取所有键并排序
|
||
keys = sorted(sorted_params.keys())
|
||
|
||
# 构建签名字符串
|
||
parts = []
|
||
for key in keys:
|
||
value = sorted_params[key]
|
||
# 仅对非字母数字字符进行URL编码
|
||
if self.needs_url_encode(value):
|
||
value = urllib.parse.quote(value, safe='')
|
||
parts.append(f"{key}={value}")
|
||
|
||
return '&'.join(parts)
|
||
|
||
def needs_url_encode(self, s: str) -> bool:
|
||
"""
|
||
判断是否需要对字符串进行URL编码
|
||
|
||
Args:
|
||
s: 需要判断的字符串
|
||
|
||
Returns:
|
||
如果包含非字母数字字符,返回True,否则返回False
|
||
"""
|
||
return bool(re.search(r'[^a-zA-Z0-9]', s))
|
||
|
||
def verify_signature(
|
||
self,
|
||
params: Dict[str, str],
|
||
secret_key: str,
|
||
max_age_ms: int = 300000
|
||
) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
验证签名
|
||
|
||
Args:
|
||
params: 所有请求参数,包括签名
|
||
secret_key: 密钥
|
||
max_age_ms: 允许的最大时间差(毫秒)
|
||
|
||
Returns:
|
||
验证结果和错误信息,成功为(True, None),失败为(False, 错误信息)
|
||
"""
|
||
# 获取并验证必要参数
|
||
if self.options.key_name not in params:
|
||
return False, f"缺少参数: {self.options.key_name}"
|
||
|
||
if self.options.channel_id_name not in params:
|
||
return False, f"缺少参数: {self.options.channel_id_name}"
|
||
|
||
if self.options.timestamp_name not in params:
|
||
return False, f"缺少参数: {self.options.timestamp_name}"
|
||
|
||
try:
|
||
timestamp = int(params[self.options.timestamp_name])
|
||
except ValueError:
|
||
return False, "无效的时间戳"
|
||
|
||
now = self.get_timestamp()
|
||
if abs(now - timestamp) > max_age_ms:
|
||
return False, "时间戳过期"
|
||
|
||
if self.options.nonce_name not in params:
|
||
return False, f"缺少参数: {self.options.nonce_name}"
|
||
|
||
# 这里可以插入nonce验证逻辑,防止重放攻击
|
||
# 服务端需要维护一个时效性的nonce存储
|
||
|
||
if self.options.signature_name not in params:
|
||
return False, f"缺少参数: {self.options.signature_name}"
|
||
|
||
provided_signature = params[self.options.signature_name]
|
||
|
||
# 计算签名并比较
|
||
expected_signature = self.calculate_signature(params, secret_key)
|
||
if expected_signature == provided_signature:
|
||
return True, None
|
||
else:
|
||
return False, "签名不匹配"
|
||
|
||
|
||
# 用于命令行直接调用的简单示例
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
import json
|
||
import os
|
||
from urllib.parse import parse_qsl
|
||
|
||
# # 加载环境变量
|
||
# dotenv.load_dotenv()
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(description='API签名工具')
|
||
parser.add_argument(
|
||
'-a', '--algorithm',
|
||
default=os.environ.get('SIGN_ALGORITHM', 'MD5'),
|
||
help='签名算法: MD5, SHA1, SHA256, HMAC_SHA256'
|
||
)
|
||
parser.add_argument(
|
||
'-k', '--access-key-id',
|
||
default=os.environ.get('ACCESS_KEY_ID', 'test-access-key-id'),
|
||
help='访问密钥ID'
|
||
)
|
||
parser.add_argument(
|
||
'-c', '--channel-id',
|
||
default=os.environ.get('CHANNEL_ID', 'test-channel-id'),
|
||
help='合作渠道方ID'
|
||
)
|
||
parser.add_argument(
|
||
'-s', '--secret-key',
|
||
default=os.environ.get('SECRET_KEY', 'test-secret-key'),
|
||
help='密钥'
|
||
)
|
||
parser.add_argument(
|
||
'-u', '--url',
|
||
default=os.environ.get('API_BASE_URL', 'https://api.example.com/v1/data'),
|
||
help='基础URL地址'
|
||
)
|
||
parser.add_argument(
|
||
'-p', '--param',
|
||
action='append',
|
||
help='请求参数,格式为key=value,可多次指定'
|
||
)
|
||
parser.add_argument(
|
||
'-j', '--json',
|
||
help='请求参数,JSON格式'
|
||
)
|
||
parser.add_argument(
|
||
'-m', '--mode',
|
||
choices=['url', 'params', 'verify'],
|
||
default='url',
|
||
help='操作模式: url - 签名URL, params - 签名参数, verify - 验证签名'
|
||
)
|
||
return parser.parse_args()
|
||
|
||
def main():
|
||
args = parse_args()
|
||
|
||
# 解析算法
|
||
try:
|
||
algorithm = SignatureAlgorithm.from_str(args.algorithm)
|
||
except ValueError as e:
|
||
print(f"警告: {e},使用默认MD5算法")
|
||
algorithm = SignatureAlgorithm.MD5
|
||
|
||
# 创建签名选项
|
||
options = SignOptions(algorithm=algorithm)
|
||
|
||
# 创建签名工具
|
||
signer = ApiSigner(options)
|
||
|
||
# 解析参数
|
||
params = {}
|
||
if args.param:
|
||
for param in args.param:
|
||
if '=' in param:
|
||
key, value = param.split('=', 1)
|
||
params[key] = value
|
||
|
||
if args.json:
|
||
try:
|
||
json_params = json.loads(args.json)
|
||
if isinstance(json_params, dict):
|
||
params.update(json_params)
|
||
except json.JSONDecodeError:
|
||
print("警告: 无效的JSON参数")
|
||
|
||
# 如果没有参数,使用默认参数进行演示
|
||
if not params:
|
||
params = {
|
||
'userId': '12345',
|
||
'action': 'getData',
|
||
'data': '测试数据', # 包含非ASCII字符,测试URL编码
|
||
}
|
||
|
||
print("===================== API签名示例 =====================")
|
||
print(f"AccessKeyId: {args.access_key_id}")
|
||
print(f"ChannelId: {args.channel_id}")
|
||
print(f"SecretKey: {args.secret_key}")
|
||
print(f"签名算法: {algorithm}")
|
||
print(f"基础URL: {args.url}")
|
||
print("请求参数:", params)
|
||
|
||
if args.mode == 'url':
|
||
# 签名URL
|
||
signed_url = signer.sign_url(args.url, params, args.access_key_id, args.secret_key, args.channel_id)
|
||
print("\n签名后的URL:")
|
||
print(signed_url)
|
||
|
||
elif args.mode == 'params':
|
||
# 获取签名后的参数
|
||
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
|
||
print("\n签名后的参数:")
|
||
for key, value in signed_params.items():
|
||
print(f" {key}: {value}")
|
||
|
||
elif args.mode == 'verify':
|
||
# 如果参数中已经包含签名,直接验证
|
||
if options.signature_name in params:
|
||
valid, error = signer.verify_signature(params, args.secret_key)
|
||
print("\n签名验证结果:")
|
||
if valid:
|
||
print(" 验证成功")
|
||
else:
|
||
print(f" 验证失败: {error}")
|
||
else:
|
||
# 先签名再验证(演示用)
|
||
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
|
||
valid, error = signer.verify_signature(signed_params, args.secret_key)
|
||
print("\n签名验证结果:")
|
||
if valid:
|
||
print(" 验证成功")
|
||
else:
|
||
print(f" 验证失败: {error}")
|
||
|
||
# 演示不同算法的签名结果
|
||
print("\n不同算法的签名结果:")
|
||
for alg in SignatureAlgorithm:
|
||
temp_options = SignOptions(algorithm=alg)
|
||
temp_signer = ApiSigner(temp_options)
|
||
|
||
# 添加必要的参数用于签名
|
||
sign_params = params.copy()
|
||
sign_params[options.key_name] = args.access_key_id
|
||
sign_params[options.channel_id_name] = args.channel_id
|
||
|
||
signature = temp_signer.calculate_signature(sign_params, args.secret_key)
|
||
print(f" {alg}: {signature}")
|
||
|
||
main() |