#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API签名工具 提供API请求签名与验证功能,支持多种签名算法。 """ import hashlib import hmac import re import time import urllib.parse from enum import Enum, auto from typing import Dict, List, Optional, Tuple, Union class SignatureAlgorithm(Enum): """签名算法类型""" MD5 = auto() SHA1 = auto() SHA256 = auto() HMAC_SHA256 = auto() @staticmethod def from_str(algorithm_str: str) -> 'SignatureAlgorithm': """从字符串解析算法类型""" algorithm_str = algorithm_str.upper() if algorithm_str == 'MD5': return SignatureAlgorithm.MD5 elif algorithm_str == 'SHA1': return SignatureAlgorithm.SHA1 elif algorithm_str == 'SHA256': return SignatureAlgorithm.SHA256 elif algorithm_str in ('HMAC_SHA256', 'HMACSHA256', 'HMAC-SHA256'): return SignatureAlgorithm.HMAC_SHA256 else: raise ValueError(f"无效的签名算法: {algorithm_str}") def __str__(self) -> str: if self == SignatureAlgorithm.MD5: return "MD5" elif self == SignatureAlgorithm.SHA1: return "SHA1" elif self == SignatureAlgorithm.SHA256: return "SHA256" elif self == SignatureAlgorithm.HMAC_SHA256: return "HMAC-SHA256" return "UNKNOWN" class SignOptions: """签名选项""" def __init__( self, algorithm: SignatureAlgorithm = SignatureAlgorithm.MD5, key_name: str = "AccessKeyId", channel_id_name: str = "channelId", timestamp_name: str = "timestamp", nonce_name: str = "nonce", signature_name: str = "sign" ): """ 初始化签名选项 Args: algorithm: 签名算法 key_name: AccessKeyId参数名 channel_id_name: 合作渠道方ID参数名 timestamp_name: 时间戳参数名 nonce_name: 随机字符串参数名 signature_name: 签名参数名 """ self.algorithm = algorithm self.key_name = key_name self.channel_id_name = channel_id_name self.timestamp_name = timestamp_name self.nonce_name = nonce_name self.signature_name = signature_name class ApiSigner: """API签名工具""" def __init__(self, options: Optional[SignOptions] = None): """ 初始化签名工具 Args: options: 签名选项,如为None则使用默认选项 """ self.options = options if options is not None else SignOptions() def generate_nonce(self) -> str: """ 生成随机字符串 Returns: 一个基于当前时间的随机字符串 """ return f"{int(time.time() * 1000)}{int(time.time()) % 1000}" def get_timestamp(self) -> int: """ 获取当前时间戳(毫秒) Returns: 当前的Unix时间戳(毫秒) """ return int(time.time() * 1000) def sign_request( self, params: Dict[str, str], access_key_id: str, secret_key: str, channel_id: str ) -> Dict[str, str]: """ 对请求进行签名 Args: params: 请求参数 access_key_id: 访问密钥ID secret_key: 密钥 channel_id: 合作渠道方ID Returns: 添加了签名的完整参数 """ # 创建用于签名的参数副本 signed_params = params.copy() # 添加认证参数 timestamp = self.get_timestamp() signed_params[self.options.key_name] = access_key_id signed_params[self.options.channel_id_name] = channel_id signed_params[self.options.timestamp_name] = str(timestamp) signed_params[self.options.nonce_name] = self.generate_nonce() # 计算签名 signature = self.calculate_signature(signed_params, secret_key) # 添加签名到参数 signed_params[self.options.signature_name] = signature return signed_params def sign_url( self, base_url: str, params: Dict[str, str], access_key_id: str, secret_key: str, channel_id: str ) -> str: """ 对URL进行签名 Args: base_url: 基础URL地址 params: 请求参数 access_key_id: 访问密钥ID secret_key: 密钥 channel_id: 合作渠道方ID Returns: 添加了签名的完整URL """ # 对请求签名 signed_params = self.sign_request(params, access_key_id, secret_key, channel_id) # 构建URL查询字符串 query_string = urllib.parse.urlencode(signed_params) # 处理已有参数的URL if '?' in base_url: return f"{base_url}&{query_string}" else: return f"{base_url}?{query_string}" def calculate_signature(self, params: Dict[str, str], secret_key: str) -> str: """ 计算签名 Args: params: 请求参数 secret_key: 密钥 Returns: 签名字符串 """ # 按键名排序并生成规范化的请求字符串 signing_string = self.create_signing_string(params) # 添加密钥 signing_string = f"{signing_string}&key={secret_key}" # 使用指定算法计算签名 if self.options.algorithm == SignatureAlgorithm.MD5: return hashlib.md5(signing_string.encode('utf-8')).hexdigest() elif self.options.algorithm == SignatureAlgorithm.SHA1: return hashlib.sha1(signing_string.encode('utf-8')).hexdigest() elif self.options.algorithm == SignatureAlgorithm.SHA256: return hashlib.sha256(signing_string.encode('utf-8')).hexdigest() elif self.options.algorithm == SignatureAlgorithm.HMAC_SHA256: return hmac.new( secret_key.encode('utf-8'), signing_string.encode('utf-8'), hashlib.sha256 ).hexdigest() else: return hashlib.md5(signing_string.encode('utf-8')).hexdigest() def create_signing_string(self, params: Dict[str, str]) -> str: """ 创建用于签名的规范化字符串 Args: params: 请求参数 Returns: 按键名排序并拼接的字符串 """ # 排除签名参数 sorted_params = {k: v for k, v in params.items() if k != self.options.signature_name} # 获取所有键并排序 keys = sorted(sorted_params.keys()) # 构建签名字符串 parts = [] for key in keys: value = sorted_params[key] # 仅对非字母数字字符进行URL编码 if self.needs_url_encode(value): value = urllib.parse.quote(value, safe='') parts.append(f"{key}={value}") return '&'.join(parts) def needs_url_encode(self, s: str) -> bool: """ 判断是否需要对字符串进行URL编码 Args: s: 需要判断的字符串 Returns: 如果包含非字母数字字符,返回True,否则返回False """ return bool(re.search(r'[^a-zA-Z0-9]', s)) def verify_signature( self, params: Dict[str, str], secret_key: str, max_age_ms: int = 300000 ) -> Tuple[bool, Optional[str]]: """ 验证签名 Args: params: 所有请求参数,包括签名 secret_key: 密钥 max_age_ms: 允许的最大时间差(毫秒) Returns: 验证结果和错误信息,成功为(True, None),失败为(False, 错误信息) """ # 获取并验证必要参数 if self.options.key_name not in params: return False, f"缺少参数: {self.options.key_name}" if self.options.channel_id_name not in params: return False, f"缺少参数: {self.options.channel_id_name}" if self.options.timestamp_name not in params: return False, f"缺少参数: {self.options.timestamp_name}" try: timestamp = int(params[self.options.timestamp_name]) except ValueError: return False, "无效的时间戳" now = self.get_timestamp() if abs(now - timestamp) > max_age_ms: return False, "时间戳过期" if self.options.nonce_name not in params: return False, f"缺少参数: {self.options.nonce_name}" # 这里可以插入nonce验证逻辑,防止重放攻击 # 服务端需要维护一个时效性的nonce存储 if self.options.signature_name not in params: return False, f"缺少参数: {self.options.signature_name}" provided_signature = params[self.options.signature_name] # 计算签名并比较 expected_signature = self.calculate_signature(params, secret_key) if expected_signature == provided_signature: return True, None else: return False, "签名不匹配" # 用于命令行直接调用的简单示例 if __name__ == "__main__": import argparse import json import os from urllib.parse import parse_qsl # # 加载环境变量 # dotenv.load_dotenv() def parse_args(): parser = argparse.ArgumentParser(description='API签名工具') parser.add_argument( '-a', '--algorithm', default=os.environ.get('SIGN_ALGORITHM', 'MD5'), help='签名算法: MD5, SHA1, SHA256, HMAC_SHA256' ) parser.add_argument( '-k', '--access-key-id', default=os.environ.get('ACCESS_KEY_ID', 'test-access-key-id'), help='访问密钥ID' ) parser.add_argument( '-c', '--channel-id', default=os.environ.get('CHANNEL_ID', 'test-channel-id'), help='合作渠道方ID' ) parser.add_argument( '-s', '--secret-key', default=os.environ.get('SECRET_KEY', 'test-secret-key'), help='密钥' ) parser.add_argument( '-u', '--url', default=os.environ.get('API_BASE_URL', 'https://api.example.com/v1/data'), help='基础URL地址' ) parser.add_argument( '-p', '--param', action='append', help='请求参数,格式为key=value,可多次指定' ) parser.add_argument( '-j', '--json', help='请求参数,JSON格式' ) parser.add_argument( '-m', '--mode', choices=['url', 'params', 'verify'], default='url', help='操作模式: url - 签名URL, params - 签名参数, verify - 验证签名' ) return parser.parse_args() def main(): args = parse_args() # 解析算法 try: algorithm = SignatureAlgorithm.from_str(args.algorithm) except ValueError as e: print(f"警告: {e},使用默认MD5算法") algorithm = SignatureAlgorithm.MD5 # 创建签名选项 options = SignOptions(algorithm=algorithm) # 创建签名工具 signer = ApiSigner(options) # 解析参数 params = {} if args.param: for param in args.param: if '=' in param: key, value = param.split('=', 1) params[key] = value if args.json: try: json_params = json.loads(args.json) if isinstance(json_params, dict): params.update(json_params) except json.JSONDecodeError: print("警告: 无效的JSON参数") # 如果没有参数,使用默认参数进行演示 if not params: params = { 'userId': '12345', 'action': 'getData', 'data': '测试数据', # 包含非ASCII字符,测试URL编码 } print("===================== API签名示例 =====================") print(f"AccessKeyId: {args.access_key_id}") print(f"ChannelId: {args.channel_id}") print(f"SecretKey: {args.secret_key}") print(f"签名算法: {algorithm}") print(f"基础URL: {args.url}") print("请求参数:", params) if args.mode == 'url': # 签名URL signed_url = signer.sign_url(args.url, params, args.access_key_id, args.secret_key, args.channel_id) print("\n签名后的URL:") print(signed_url) elif args.mode == 'params': # 获取签名后的参数 signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id) print("\n签名后的参数:") for key, value in signed_params.items(): print(f" {key}: {value}") elif args.mode == 'verify': # 如果参数中已经包含签名,直接验证 if options.signature_name in params: valid, error = signer.verify_signature(params, args.secret_key) print("\n签名验证结果:") if valid: print(" 验证成功") else: print(f" 验证失败: {error}") else: # 先签名再验证(演示用) signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id) valid, error = signer.verify_signature(signed_params, args.secret_key) print("\n签名验证结果:") if valid: print(" 验证成功") else: print(f" 验证失败: {error}") # 演示不同算法的签名结果 print("\n不同算法的签名结果:") for alg in SignatureAlgorithm: temp_options = SignOptions(algorithm=alg) temp_signer = ApiSigner(temp_options) # 添加必要的参数用于签名 sign_params = params.copy() sign_params[options.key_name] = args.access_key_id sign_params[options.channel_id_name] = args.channel_id signature = temp_signer.calculate_signature(sign_params, args.secret_key) print(f" {alg}: {signature}") main()