添加Python实现的API签名工具,包括核心库、命令行工具和使用文档,支持多种签名算法和环境变量配置。

This commit is contained in:
SF-bytebytebrew
2025-05-21 15:36:26 +08:00
parent 07b9de3eec
commit f9f75b7a02
3 changed files with 627 additions and 0 deletions

167
python/README.md Normal file
View File

@@ -0,0 +1,167 @@
# API签名工具 - Python实现
## 项目结构
根据实际项目文件结构:
```plaintext
python/
├── api_signer.py # 签名工具实现(单文件版)
├── requirements.txt # 依赖配置
```
## 使用方法
### 安装依赖
```bash
pip install -r requirements.txt
```
### 运行命令行工具
```bash
python api_signer.py [选项]
```
### 命令行选项
| 选项 | 描述 |
|------|------|
| `-a, --algorithm` | 签名算法: md5, sha1, sha256, hmac-sha256 |
| `-u, --url` | API基础URL |
| `-p, --param` | 请求参数格式为key=value可多次使用 |
| `-k, --key` | 访问密钥ID |
| `-s, --secret` | 密钥 |
| `-c, --channel` | 合作渠道方ID |
| `-h, --help` | 显示帮助信息 |
| `-j, --json` | 以JSON格式指定参数 |
### 常用命令示例
**基本用法**
```bash
python api_signer.py
```
**自定义参数**
```bash
python api_signer.py \
-u "https://api.example.com/user/info" \
-p "userId=12345" -p "action=getInfo" \
-k "YOUR_ACCESS_KEY" \
-s "YOUR_SECRET_KEY" \
-c "3"
```
**使用JSON参数**
```bash
python api_signer.py -j '{"userId": "12345", "action": "getData"}'
```
**指定签名算法**
```bash
python api_signer.py -a sha256
```
**帮助信息**
```bash
python api_signer.py --help
```
### API接口测试实例
使用真实API接口进行测试
```bash
# 未签名的API调用测试 - 返回错误
curl "https://api-v1.sound-force.com:8443/p/album/single/media-url?channelId=3&singleId=381980"
# 返回: {"code":400,"data":null,"msg":"Missing AccessKeyId","success":false}
# 生成访问https://api-v1.sound-force.com:8443/p/album/single/media-url的签名URL
python api_signer.py \
-a md5 \
-u "https://api-v1.sound-force.com:8443/p/album/single/media-url" \
-p "singleId=381980" \
-k "YOUR_ACCESS_KEY" \
-s "YOUR_SECRET_KEY" \
-c "3"
# 使用curl测试API接口
signed_url=$(python api_signer.py \
-a md5 \
-u "https://api-v1.sound-force.com:8443/p/album/single/media-url" \
-p "singleId=381980" \
-k "YOUR_ACCESS_KEY" \
-s "YOUR_SECRET_KEY" \
-c "3" | grep -A 1 "签名后的URL:" | tail -n 1)
curl -v "$signed_url"
# 使用Python requests库测试
python -c "
import requests
import subprocess
url = subprocess.check_output(\"python api_signer.py -a md5 -u 'https://api-v1.sound-force.com:8443/p/album/single/media-url' -p 'singleId=381980' -k 'YOUR_ACCESS_KEY' -s 'YOUR_SECRET_KEY' -c '3' | grep -A 1 '签名后的URL:' | tail -n 1\", shell=True).decode().strip()
response = requests.get(url)
print(f'状态码: {response.status_code}')
print(f'响应内容: {response.text}')
"
```
请注意:
- 替换`YOUR_ACCESS_KEY`为实际的访问密钥ID
- 替换`YOUR_SECRET_KEY`为实际的密钥
- 示例使用的渠道ID为`3`,请根据实际情况调整
使用有效的密钥和签名后API接口将返回成功响应(状态码200)并提供媒体URL数据。
### 代码集成
```python
from api_signer import ApiSigner, SignOptions, SignatureAlgorithm
# 创建签名选项
options = SignOptions(algorithm=SignatureAlgorithm.MD5)
# 创建签名工具
signer = ApiSigner(options)
# 准备请求参数
params = {
"singleId": "381980"
}
# 执行签名
signed_params = signer.sign_request(
params,
"YOUR_ACCESS_KEY",
"YOUR_SECRET_KEY",
"3"
)
# 或签名URL
signed_url = signer.sign_url(
"https://api-v1.sound-force.com:8443/p/album/single/media-url",
params,
"YOUR_ACCESS_KEY",
"YOUR_SECRET_KEY",
"3"
)
```
### 环境变量
该工具支持从`.env`文件加载以下配置:
- `ACCESS_KEY_ID`: 访问密钥ID
- `SECRET_KEY`: 密钥
- `CHANNEL_ID`: 渠道ID
- `SIGN_ALGORITHM`: 签名算法
- `API_BASE_URL`: API基础URL

459
python/api_signer.py Normal file
View File

@@ -0,0 +1,459 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API签名工具
提供API请求签名与验证功能支持多种签名算法。
"""
import hashlib
import hmac
import re
import time
import urllib.parse
from enum import Enum, auto
from typing import Dict, List, Optional, Tuple, Union
class SignatureAlgorithm(Enum):
"""签名算法类型"""
MD5 = auto()
SHA1 = auto()
SHA256 = auto()
HMAC_SHA256 = auto()
@staticmethod
def from_str(algorithm_str: str) -> 'SignatureAlgorithm':
"""从字符串解析算法类型"""
algorithm_str = algorithm_str.upper()
if algorithm_str == 'MD5':
return SignatureAlgorithm.MD5
elif algorithm_str == 'SHA1':
return SignatureAlgorithm.SHA1
elif algorithm_str == 'SHA256':
return SignatureAlgorithm.SHA256
elif algorithm_str in ('HMAC_SHA256', 'HMACSHA256', 'HMAC-SHA256'):
return SignatureAlgorithm.HMAC_SHA256
else:
raise ValueError(f"无效的签名算法: {algorithm_str}")
def __str__(self) -> str:
if self == SignatureAlgorithm.MD5:
return "MD5"
elif self == SignatureAlgorithm.SHA1:
return "SHA1"
elif self == SignatureAlgorithm.SHA256:
return "SHA256"
elif self == SignatureAlgorithm.HMAC_SHA256:
return "HMAC-SHA256"
return "UNKNOWN"
class SignOptions:
"""签名选项"""
def __init__(
self,
algorithm: SignatureAlgorithm = SignatureAlgorithm.MD5,
key_name: str = "AccessKeyId",
channel_id_name: str = "channelId",
timestamp_name: str = "timestamp",
nonce_name: str = "nonce",
signature_name: str = "sign"
):
"""
初始化签名选项
Args:
algorithm: 签名算法
key_name: AccessKeyId参数名
channel_id_name: 合作渠道方ID参数名
timestamp_name: 时间戳参数名
nonce_name: 随机字符串参数名
signature_name: 签名参数名
"""
self.algorithm = algorithm
self.key_name = key_name
self.channel_id_name = channel_id_name
self.timestamp_name = timestamp_name
self.nonce_name = nonce_name
self.signature_name = signature_name
class ApiSigner:
"""API签名工具"""
def __init__(self, options: Optional[SignOptions] = None):
"""
初始化签名工具
Args:
options: 签名选项如为None则使用默认选项
"""
self.options = options if options is not None else SignOptions()
def generate_nonce(self) -> str:
"""
生成随机字符串
Returns:
一个基于当前时间的随机字符串
"""
return f"{int(time.time() * 1000)}{int(time.time()) % 1000}"
def get_timestamp(self) -> int:
"""
获取当前时间戳(毫秒)
Returns:
当前的Unix时间戳毫秒
"""
return int(time.time() * 1000)
def sign_request(
self,
params: Dict[str, str],
access_key_id: str,
secret_key: str,
channel_id: str
) -> Dict[str, str]:
"""
对请求进行签名
Args:
params: 请求参数
access_key_id: 访问密钥ID
secret_key: 密钥
channel_id: 合作渠道方ID
Returns:
添加了签名的完整参数
"""
# 创建用于签名的参数副本
signed_params = params.copy()
# 添加认证参数
timestamp = self.get_timestamp()
signed_params[self.options.key_name] = access_key_id
signed_params[self.options.channel_id_name] = channel_id
signed_params[self.options.timestamp_name] = str(timestamp)
signed_params[self.options.nonce_name] = self.generate_nonce()
# 计算签名
signature = self.calculate_signature(signed_params, secret_key)
# 添加签名到参数
signed_params[self.options.signature_name] = signature
return signed_params
def sign_url(
self,
base_url: str,
params: Dict[str, str],
access_key_id: str,
secret_key: str,
channel_id: str
) -> str:
"""
对URL进行签名
Args:
base_url: 基础URL地址
params: 请求参数
access_key_id: 访问密钥ID
secret_key: 密钥
channel_id: 合作渠道方ID
Returns:
添加了签名的完整URL
"""
# 对请求签名
signed_params = self.sign_request(params, access_key_id, secret_key, channel_id)
# 构建URL查询字符串
query_string = urllib.parse.urlencode(signed_params)
# 处理已有参数的URL
if '?' in base_url:
return f"{base_url}&{query_string}"
else:
return f"{base_url}?{query_string}"
def calculate_signature(self, params: Dict[str, str], secret_key: str) -> str:
"""
计算签名
Args:
params: 请求参数
secret_key: 密钥
Returns:
签名字符串
"""
# 按键名排序并生成规范化的请求字符串
signing_string = self.create_signing_string(params)
# 添加密钥
signing_string = f"{signing_string}&key={secret_key}"
# 使用指定算法计算签名
if self.options.algorithm == SignatureAlgorithm.MD5:
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.SHA1:
return hashlib.sha1(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.SHA256:
return hashlib.sha256(signing_string.encode('utf-8')).hexdigest()
elif self.options.algorithm == SignatureAlgorithm.HMAC_SHA256:
return hmac.new(
secret_key.encode('utf-8'),
signing_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
else:
return hashlib.md5(signing_string.encode('utf-8')).hexdigest()
def create_signing_string(self, params: Dict[str, str]) -> str:
"""
创建用于签名的规范化字符串
Args:
params: 请求参数
Returns:
按键名排序并拼接的字符串
"""
# 排除签名参数
sorted_params = {k: v for k, v in params.items() if k != self.options.signature_name}
# 获取所有键并排序
keys = sorted(sorted_params.keys())
# 构建签名字符串
parts = []
for key in keys:
value = sorted_params[key]
# 仅对非字母数字字符进行URL编码
if self.needs_url_encode(value):
value = urllib.parse.quote(value, safe='')
parts.append(f"{key}={value}")
return '&'.join(parts)
def needs_url_encode(self, s: str) -> bool:
"""
判断是否需要对字符串进行URL编码
Args:
s: 需要判断的字符串
Returns:
如果包含非字母数字字符返回True否则返回False
"""
return bool(re.search(r'[^a-zA-Z0-9]', s))
def verify_signature(
self,
params: Dict[str, str],
secret_key: str,
max_age_ms: int = 300000
) -> Tuple[bool, Optional[str]]:
"""
验证签名
Args:
params: 所有请求参数,包括签名
secret_key: 密钥
max_age_ms: 允许的最大时间差(毫秒)
Returns:
验证结果和错误信息,成功为(True, None),失败为(False, 错误信息)
"""
# 获取并验证必要参数
if self.options.key_name not in params:
return False, f"缺少参数: {self.options.key_name}"
if self.options.channel_id_name not in params:
return False, f"缺少参数: {self.options.channel_id_name}"
if self.options.timestamp_name not in params:
return False, f"缺少参数: {self.options.timestamp_name}"
try:
timestamp = int(params[self.options.timestamp_name])
except ValueError:
return False, "无效的时间戳"
now = self.get_timestamp()
if abs(now - timestamp) > max_age_ms:
return False, "时间戳过期"
if self.options.nonce_name not in params:
return False, f"缺少参数: {self.options.nonce_name}"
# 这里可以插入nonce验证逻辑防止重放攻击
# 服务端需要维护一个时效性的nonce存储
if self.options.signature_name not in params:
return False, f"缺少参数: {self.options.signature_name}"
provided_signature = params[self.options.signature_name]
# 计算签名并比较
expected_signature = self.calculate_signature(params, secret_key)
if expected_signature == provided_signature:
return True, None
else:
return False, "签名不匹配"
# 用于命令行直接调用的简单示例
if __name__ == "__main__":
import argparse
import json
import os
from urllib.parse import parse_qsl
# # 加载环境变量
# dotenv.load_dotenv()
def parse_args():
parser = argparse.ArgumentParser(description='API签名工具')
parser.add_argument(
'-a', '--algorithm',
default=os.environ.get('SIGN_ALGORITHM', 'MD5'),
help='签名算法: MD5, SHA1, SHA256, HMAC_SHA256'
)
parser.add_argument(
'-k', '--access-key-id',
default=os.environ.get('ACCESS_KEY_ID', 'test-access-key-id'),
help='访问密钥ID'
)
parser.add_argument(
'-c', '--channel-id',
default=os.environ.get('CHANNEL_ID', 'test-channel-id'),
help='合作渠道方ID'
)
parser.add_argument(
'-s', '--secret-key',
default=os.environ.get('SECRET_KEY', 'test-secret-key'),
help='密钥'
)
parser.add_argument(
'-u', '--url',
default=os.environ.get('API_BASE_URL', 'https://api.example.com/v1/data'),
help='基础URL地址'
)
parser.add_argument(
'-p', '--param',
action='append',
help='请求参数格式为key=value可多次指定'
)
parser.add_argument(
'-j', '--json',
help='请求参数JSON格式'
)
parser.add_argument(
'-m', '--mode',
choices=['url', 'params', 'verify'],
default='url',
help='操作模式: url - 签名URL, params - 签名参数, verify - 验证签名'
)
return parser.parse_args()
def main():
args = parse_args()
# 解析算法
try:
algorithm = SignatureAlgorithm.from_str(args.algorithm)
except ValueError as e:
print(f"警告: {e}使用默认MD5算法")
algorithm = SignatureAlgorithm.MD5
# 创建签名选项
options = SignOptions(algorithm=algorithm)
# 创建签名工具
signer = ApiSigner(options)
# 解析参数
params = {}
if args.param:
for param in args.param:
if '=' in param:
key, value = param.split('=', 1)
params[key] = value
if args.json:
try:
json_params = json.loads(args.json)
if isinstance(json_params, dict):
params.update(json_params)
except json.JSONDecodeError:
print("警告: 无效的JSON参数")
# 如果没有参数,使用默认参数进行演示
if not params:
params = {
'userId': '12345',
'action': 'getData',
'data': '测试数据', # 包含非ASCII字符测试URL编码
}
print("===================== API签名示例 =====================")
print(f"AccessKeyId: {args.access_key_id}")
print(f"ChannelId: {args.channel_id}")
print(f"SecretKey: {args.secret_key}")
print(f"签名算法: {algorithm}")
print(f"基础URL: {args.url}")
print("请求参数:", params)
if args.mode == 'url':
# 签名URL
signed_url = signer.sign_url(args.url, params, args.access_key_id, args.secret_key, args.channel_id)
print("\n签名后的URL:")
print(signed_url)
elif args.mode == 'params':
# 获取签名后的参数
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
print("\n签名后的参数:")
for key, value in signed_params.items():
print(f" {key}: {value}")
elif args.mode == 'verify':
# 如果参数中已经包含签名,直接验证
if options.signature_name in params:
valid, error = signer.verify_signature(params, args.secret_key)
print("\n签名验证结果:")
if valid:
print(" 验证成功")
else:
print(f" 验证失败: {error}")
else:
# 先签名再验证(演示用)
signed_params = signer.sign_request(params, args.access_key_id, args.secret_key, args.channel_id)
valid, error = signer.verify_signature(signed_params, args.secret_key)
print("\n签名验证结果:")
if valid:
print(" 验证成功")
else:
print(f" 验证失败: {error}")
# 演示不同算法的签名结果
print("\n不同算法的签名结果:")
for alg in SignatureAlgorithm:
temp_options = SignOptions(algorithm=alg)
temp_signer = ApiSigner(temp_options)
# 添加必要的参数用于签名
sign_params = params.copy()
sign_params[options.key_name] = args.access_key_id
sign_params[options.channel_id_name] = args.channel_id
signature = temp_signer.calculate_signature(sign_params, args.secret_key)
print(f" {alg}: {signature}")
main()

1
python/requirements.txt Normal file
View File

@@ -0,0 +1 @@
python-dotenv