From f9f75b7a027e4ca7d2224a888caed8dd6379408e Mon Sep 17 00:00:00 2001 From: SF-bytebytebrew Date: Wed, 21 May 2025 15:36:26 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E6=B7=BB=E5=8A=A0Python=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E7=9A=84API=E7=AD=BE=E5=90=8D=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=EF=BC=8C=E5=8C=85=E6=8B=AC=E6=A0=B8=E5=BF=83=E5=BA=93=E3=80=81?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E8=A1=8C=E5=B7=A5=E5=85=B7=E5=92=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E6=96=87=E6=A1=A3=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=A7=8D=E7=AD=BE=E5=90=8D=E7=AE=97=E6=B3=95=E5=92=8C=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E5=8F=98=E9=87=8F=E9=85=8D=E7=BD=AE=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/README.md | 167 +++++++++++++++ python/api_signer.py | 459 ++++++++++++++++++++++++++++++++++++++++ python/requirements.txt | 1 + 3 files changed, 627 insertions(+) create mode 100644 python/README.md create mode 100644 python/api_signer.py create mode 100644 python/requirements.txt diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..b5bb4cf --- /dev/null +++ b/python/README.md @@ -0,0 +1,167 @@ +# API签名工具 - Python实现 + +## 项目结构 + +根据实际项目文件结构: + +```plaintext +python/ +├── api_signer.py # 签名工具实现(单文件版) +├── requirements.txt # 依赖配置 +``` + +## 使用方法 + +### 安装依赖 + +```bash +pip install -r requirements.txt +``` + +### 运行命令行工具 + +```bash +python api_signer.py [选项] +``` + +### 命令行选项 + +| 选项 | 描述 | +|------|------| +| `-a, --algorithm` | 签名算法: md5, sha1, sha256, hmac-sha256 | +| `-u, --url` | API基础URL | +| `-p, --param` | 请求参数,格式为key=value,可多次使用 | +| `-k, --key` | 访问密钥ID | +| `-s, --secret` | 密钥 | +| `-c, --channel` | 合作渠道方ID | +| `-h, --help` | 显示帮助信息 | +| `-j, --json` | 以JSON格式指定参数 | + +### 常用命令示例 + +**基本用法** + +```bash +python api_signer.py +``` + +**自定义参数** + +```bash +python api_signer.py \ + -u "https://api.example.com/user/info" \ + -p "userId=12345" -p "action=getInfo" \ + -k "YOUR_ACCESS_KEY" \ + -s "YOUR_SECRET_KEY" \ + -c "3" +``` + +**使用JSON参数** + +```bash +python api_signer.py -j '{"userId": "12345", "action": "getData"}' +``` + +**指定签名算法** + +```bash +python api_signer.py -a sha256 +``` + +**帮助信息** + +```bash +python api_signer.py --help +``` + +### API接口测试实例 + +使用真实API接口进行测试: + +```bash +# 未签名的API调用测试 - 返回错误 +curl "https://api-v1.sound-force.com:8443/p/album/single/media-url?channelId=3&singleId=381980" +# 返回: {"code":400,"data":null,"msg":"Missing AccessKeyId","success":false} + +# 生成访问https://api-v1.sound-force.com:8443/p/album/single/media-url的签名URL +python api_signer.py \ + -a md5 \ + -u "https://api-v1.sound-force.com:8443/p/album/single/media-url" \ + -p "singleId=381980" \ + -k "YOUR_ACCESS_KEY" \ + -s "YOUR_SECRET_KEY" \ + -c "3" + +# 使用curl测试API接口 +signed_url=$(python api_signer.py \ + -a md5 \ + -u "https://api-v1.sound-force.com:8443/p/album/single/media-url" \ + -p "singleId=381980" \ + -k "YOUR_ACCESS_KEY" \ + -s "YOUR_SECRET_KEY" \ + -c "3" | grep -A 1 "签名后的URL:" | tail -n 1) +curl -v "$signed_url" + +# 使用Python requests库测试 +python -c " +import requests +import subprocess + +url = subprocess.check_output(\"python api_signer.py -a md5 -u 'https://api-v1.sound-force.com:8443/p/album/single/media-url' -p 'singleId=381980' -k 'YOUR_ACCESS_KEY' -s 'YOUR_SECRET_KEY' -c '3' | grep -A 1 '签名后的URL:' | tail -n 1\", shell=True).decode().strip() +response = requests.get(url) +print(f'状态码: {response.status_code}') +print(f'响应内容: {response.text}') +" +``` + +请注意: + +- 替换`YOUR_ACCESS_KEY`为实际的访问密钥ID +- 替换`YOUR_SECRET_KEY`为实际的密钥 +- 示例使用的渠道ID为`3`,请根据实际情况调整 + +使用有效的密钥和签名后,API接口将返回成功响应(状态码200)并提供媒体URL数据。 + +### 代码集成 + +```python +from api_signer import ApiSigner, SignOptions, SignatureAlgorithm + +# 创建签名选项 +options = SignOptions(algorithm=SignatureAlgorithm.MD5) + +# 创建签名工具 +signer = ApiSigner(options) + +# 准备请求参数 +params = { + "singleId": "381980" +} + +# 执行签名 +signed_params = signer.sign_request( + params, + "YOUR_ACCESS_KEY", + "YOUR_SECRET_KEY", + "3" +) + +# 或签名URL +signed_url = signer.sign_url( + "https://api-v1.sound-force.com:8443/p/album/single/media-url", + params, + "YOUR_ACCESS_KEY", + "YOUR_SECRET_KEY", + "3" +) +``` + +### 环境变量 + +该工具支持从`.env`文件加载以下配置: + +- `ACCESS_KEY_ID`: 访问密钥ID +- `SECRET_KEY`: 密钥 +- `CHANNEL_ID`: 渠道ID +- `SIGN_ALGORITHM`: 签名算法 +- `API_BASE_URL`: API基础URL diff --git a/python/api_signer.py b/python/api_signer.py new file mode 100644 index 0000000..6d00d81 --- /dev/null +++ b/python/api_signer.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +API签名工具 + +提供API请求签名与验证功能,支持多种签名算法。 +""" + +import hashlib +import hmac +import re +import time +import urllib.parse +from enum import Enum, auto +from typing import Dict, List, Optional, Tuple, Union + + +class SignatureAlgorithm(Enum): + """签名算法类型""" + MD5 = auto() + SHA1 = auto() + SHA256 = auto() + HMAC_SHA256 = auto() + + @staticmethod + def from_str(algorithm_str: str) -> 'SignatureAlgorithm': + """从字符串解析算法类型""" + algorithm_str = algorithm_str.upper() + if algorithm_str == 'MD5': + return SignatureAlgorithm.MD5 + elif algorithm_str == 'SHA1': + return SignatureAlgorithm.SHA1 + elif algorithm_str == 'SHA256': + return SignatureAlgorithm.SHA256 + elif algorithm_str in ('HMAC_SHA256', 'HMACSHA256', 'HMAC-SHA256'): + return SignatureAlgorithm.HMAC_SHA256 + else: + raise ValueError(f"无效的签名算法: {algorithm_str}") + + def __str__(self) -> str: + if self == SignatureAlgorithm.MD5: + return "MD5" + elif self == SignatureAlgorithm.SHA1: + return "SHA1" + elif self == SignatureAlgorithm.SHA256: + return "SHA256" + elif self == SignatureAlgorithm.HMAC_SHA256: + return "HMAC-SHA256" + return "UNKNOWN" + + +class SignOptions: + """签名选项""" + + def __init__( + self, + algorithm: SignatureAlgorithm = SignatureAlgorithm.MD5, + key_name: str = "AccessKeyId", + channel_id_name: str = "channelId", + timestamp_name: str = "timestamp", + nonce_name: str = "nonce", + signature_name: str = "sign" + ): + """ + 初始化签名选项 + + Args: + algorithm: 签名算法 + key_name: AccessKeyId参数名 + channel_id_name: 合作渠道方ID参数名 + timestamp_name: 时间戳参数名 + nonce_name: 随机字符串参数名 + signature_name: 签名参数名 + """ + self.algorithm = algorithm + self.key_name = key_name + self.channel_id_name = channel_id_name + self.timestamp_name = timestamp_name + self.nonce_name = nonce_name + self.signature_name = signature_name + + +class ApiSigner: + """API签名工具""" + + def __init__(self, options: Optional[SignOptions] = None): + """ + 初始化签名工具 + + Args: + options: 签名选项,如为None则使用默认选项 + """ + self.options = options if options is not None else SignOptions() + + def generate_nonce(self) -> str: + """ + 生成随机字符串 + + Returns: + 一个基于当前时间的随机字符串 + """ + return f"{int(time.time() * 1000)}{int(time.time()) % 1000}" + + def get_timestamp(self) -> int: + """ + 获取当前时间戳(毫秒) + + Returns: + 当前的Unix时间戳(毫秒) + """ + return int(time.time() * 1000) + + def sign_request( + self, + params: Dict[str, str], + access_key_id: str, + secret_key: str, + channel_id: str + ) -> Dict[str, str]: + """ + 对请求进行签名 + + Args: + params: 请求参数 + access_key_id: 访问密钥ID + secret_key: 密钥 + channel_id: 合作渠道方ID + + Returns: + 添加了签名的完整参数 + """ + # 创建用于签名的参数副本 + signed_params = params.copy() + + # 添加认证参数 + timestamp = self.get_timestamp() + signed_params[self.options.key_name] = access_key_id + signed_params[self.options.channel_id_name] = channel_id + signed_params[self.options.timestamp_name] = str(timestamp) + signed_params[self.options.nonce_name] = self.generate_nonce() + + # 计算签名 + signature = self.calculate_signature(signed_params, secret_key) + + # 添加签名到参数 + signed_params[self.options.signature_name] = signature + + return signed_params + + def sign_url( + self, + base_url: str, + params: Dict[str, str], + access_key_id: str, + secret_key: str, + channel_id: str + ) -> str: + """ + 对URL进行签名 + + Args: + base_url: 基础URL地址 + params: 请求参数 + access_key_id: 访问密钥ID + secret_key: 密钥 + channel_id: 合作渠道方ID + + Returns: + 添加了签名的完整URL + """ + # 对请求签名 + signed_params = self.sign_request(params, access_key_id, secret_key, channel_id) + + # 构建URL查询字符串 + query_string = urllib.parse.urlencode(signed_params) + + # 处理已有参数的URL + if '?' in base_url: + return f"{base_url}&{query_string}" + else: + return f"{base_url}?{query_string}" + + def calculate_signature(self, params: Dict[str, str], secret_key: str) -> str: + """ + 计算签名 + + Args: + params: 请求参数 + secret_key: 密钥 + + Returns: + 签名字符串 + """ + # 按键名排序并生成规范化的请求字符串 + signing_string = self.create_signing_string(params) + + # 添加密钥 + signing_string = f"{signing_string}&key={secret_key}" + + # 使用指定算法计算签名 + if self.options.algorithm == SignatureAlgorithm.MD5: + return hashlib.md5(signing_string.encode('utf-8')).hexdigest() + elif self.options.algorithm == SignatureAlgorithm.SHA1: + return hashlib.sha1(signing_string.encode('utf-8')).hexdigest() + elif self.options.algorithm == SignatureAlgorithm.SHA256: + return hashlib.sha256(signing_string.encode('utf-8')).hexdigest() + elif self.options.algorithm == SignatureAlgorithm.HMAC_SHA256: + return hmac.new( + secret_key.encode('utf-8'), + signing_string.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + else: + return hashlib.md5(signing_string.encode('utf-8')).hexdigest() + + def create_signing_string(self, params: Dict[str, str]) -> str: + """ + 创建用于签名的规范化字符串 + + Args: + params: 请求参数 + + Returns: + 按键名排序并拼接的字符串 + """ + # 排除签名参数 + sorted_params = {k: v for k, v in params.items() if k != self.options.signature_name} + + # 获取所有键并排序 + keys = sorted(sorted_params.keys()) + + # 构建签名字符串 + parts = [] + for key in keys: + value = sorted_params[key] + # 仅对非字母数字字符进行URL编码 + if self.needs_url_encode(value): + value = urllib.parse.quote(value, safe='') + parts.append(f"{key}={value}") + + return '&'.join(parts) + + def needs_url_encode(self, s: str) -> bool: + """ + 判断是否需要对字符串进行URL编码 + + Args: + s: 需要判断的字符串 + + Returns: + 如果包含非字母数字字符,返回True,否则返回False + """ + return bool(re.search(r'[^a-zA-Z0-9]', s)) + + def verify_signature( + self, + params: Dict[str, str], + secret_key: str, + max_age_ms: int = 300000 + ) -> Tuple[bool, Optional[str]]: + """ + 验证签名 + + Args: + params: 所有请求参数,包括签名 + secret_key: 密钥 + max_age_ms: 允许的最大时间差(毫秒) + + Returns: + 验证结果和错误信息,成功为(True, None),失败为(False, 错误信息) + """ + # 获取并验证必要参数 + if self.options.key_name not in params: + return False, f"缺少参数: {self.options.key_name}" + + if self.options.channel_id_name not in params: + return False, f"缺少参数: {self.options.channel_id_name}" + + if self.options.timestamp_name not in params: + return False, f"缺少参数: {self.options.timestamp_name}" + + try: + timestamp = int(params[self.options.timestamp_name]) + except ValueError: + return False, "无效的时间戳" + + now = self.get_timestamp() + if abs(now - timestamp) > max_age_ms: + return False, "时间戳过期" + + if self.options.nonce_name not in params: + return False, f"缺少参数: {self.options.nonce_name}" + + # 这里可以插入nonce验证逻辑,防止重放攻击 + # 服务端需要维护一个时效性的nonce存储 + + if self.options.signature_name not in params: + return False, f"缺少参数: {self.options.signature_name}" + + provided_signature = params[self.options.signature_name] + + # 计算签名并比较 + expected_signature = self.calculate_signature(params, secret_key) + if expected_signature == provided_signature: + return True, None + else: + return False, "签名不匹配" + + +# 用于命令行直接调用的简单示例 +if __name__ == "__main__": + import argparse + import json + import os + from urllib.parse import parse_qsl + + # # 加载环境变量 + # dotenv.load_dotenv() + + def parse_args(): + parser = argparse.ArgumentParser(description='API签名工具') + parser.add_argument( + '-a', '--algorithm', + default=os.environ.get('SIGN_ALGORITHM', 'MD5'), + help='签名算法: MD5, SHA1, SHA256, HMAC_SHA256' + ) + parser.add_argument( + '-k', '--access-key-id', + default=os.environ.get('ACCESS_KEY_ID', 'test-access-key-id'), + help='访问密钥ID' + ) + parser.add_argument( + '-c', '--channel-id', + default=os.environ.get('CHANNEL_ID', 'test-channel-id'), + help='合作渠道方ID' + ) + parser.add_argument( + '-s', '--secret-key', + default=os.environ.get('SECRET_KEY', 'test-secret-key'), + help='密钥' + ) + parser.add_argument( + '-u', '--url', + default=os.environ.get('API_BASE_URL', 'https://api.example.com/v1/data'), + help='基础URL地址' + ) + parser.add_argument( + '-p', '--param', + action='append', + help='请求参数,格式为key=value,可多次指定' + ) + parser.add_argument( + '-j', '--json', + help='请求参数,JSON格式' + ) + parser.add_argument( + '-m', '--mode', + choices=['url', 'params', 'verify'], + default='url', + help='操作模式: url - 签名URL, params - 签名参数, verify - 验证签名' + ) + return parser.parse_args() + + def main(): + args = parse_args() + + # 解析算法 + try: + algorithm = SignatureAlgorithm.from_str(args.algorithm) + except ValueError as e: + print(f"警告: {e},使用默认MD5算法") + algorithm = SignatureAlgorithm.MD5 + + # 创建签名选项 + options = SignOptions(algorithm=algorithm) + + # 创建签名工具 + signer = ApiSigner(options) + + # 解析参数 + params = {} + if args.param: + for param in args.param: + if '=' in param: + key, value = param.split('=', 1) + params[key] = value + + if args.json: + try: + json_params = json.loads(args.json) + if isinstance(json_params, dict): + params.update(json_params) + except json.JSONDecodeError: + print("警告: 无效的JSON参数") + + # 如果没有参数,使用默认参数进行演示 + if not params: + params = { + 'userId': '12345', + 'action': 'getData', + 'data': '测试数据', # 包含非ASCII字符,测试URL编码 + } + + print("===================== API签名示例 =====================") + print(f"AccessKeyId: {args.access_key_id}") + print(f"ChannelId: {args.channel_id}") + print(f"SecretKey: {args.secret_key}") + print(f"签名算法: {algorithm}") + print(f"基础URL: {args.url}") + print("请求参数:", params) + + if args.mode == 'url': + # 签名URL + signed_url = signer.sign_url(args.url, params, args.access_key_id, args.secret_key, args.channel_id) + print("\n签名后的URL:") + print(signed_url) + + elif args.mode == 'params': + # 获取签名后的参数 + signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id) + print("\n签名后的参数:") + for key, value in signed_params.items(): + print(f" {key}: {value}") + + elif args.mode == 'verify': + # 如果参数中已经包含签名,直接验证 + if options.signature_name in params: + valid, error = signer.verify_signature(params, args.secret_key) + print("\n签名验证结果:") + if valid: + print(" 验证成功") + else: + print(f" 验证失败: {error}") + else: + # 先签名再验证(演示用) + signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id) + valid, error = signer.verify_signature(signed_params, args.secret_key) + print("\n签名验证结果:") + if valid: + print(" 验证成功") + else: + print(f" 验证失败: {error}") + + # 演示不同算法的签名结果 + print("\n不同算法的签名结果:") + for alg in SignatureAlgorithm: + temp_options = SignOptions(algorithm=alg) + temp_signer = ApiSigner(temp_options) + + # 添加必要的参数用于签名 + sign_params = params.copy() + sign_params[options.key_name] = args.access_key_id + sign_params[options.channel_id_name] = args.channel_id + + signature = temp_signer.calculate_signature(sign_params, args.secret_key) + print(f" {alg}: {signature}") + + main() \ No newline at end of file diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 0000000..3e338bf --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1 @@ +python-dotenv \ No newline at end of file