180 lines
6.7 KiB
Python
180 lines
6.7 KiB
Python
|
|
from langchain.tools import BaseTool
|
|||
|
|
from pydantic import BaseModel, Field
|
|||
|
|
from typing import Optional, Type, Literal, ClassVar
|
|||
|
|
import datetime
|
|||
|
|
import pytz
|
|||
|
|
import logging
|
|||
|
|
|
|||
|
|
logger = logging.getLogger("datetime_tool")
|
|||
|
|
|
|||
|
|
# 定义输入参数模型(使用Pydantic替代原get_parameters())
|
|||
|
|
class DateTimeInput(BaseModel):
|
|||
|
|
operation: Literal["current_time", "timezone_convert", "date_diff", "add_time", "format_date"] = Field(
|
|||
|
|
description="操作类型: current_time(当前时间), timezone_convert(时区转换), "
|
|||
|
|
"date_diff(日期差), add_time(时间加减), format_date(格式化日期)"
|
|||
|
|
)
|
|||
|
|
timezone: Optional[str] = Field(
|
|||
|
|
default="UTC",
|
|||
|
|
description="时区名称 (e.g., 'UTC', 'Asia/Shanghai')"
|
|||
|
|
)
|
|||
|
|
date_string: Optional[str] = Field(
|
|||
|
|
description="日期字符串 (格式: YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS)"
|
|||
|
|
)
|
|||
|
|
target_timezone: Optional[str] = Field(
|
|||
|
|
description="目标时区(用于时区转换)"
|
|||
|
|
)
|
|||
|
|
days: Optional[int] = Field(
|
|||
|
|
default=0,
|
|||
|
|
description="要加减的天数"
|
|||
|
|
)
|
|||
|
|
hours: Optional[int] = Field(
|
|||
|
|
default=0,
|
|||
|
|
description="要加减的小时数"
|
|||
|
|
)
|
|||
|
|
format: Optional[str] = Field(
|
|||
|
|
default="%Y-%m-%d %H:%M:%S",
|
|||
|
|
description="日期格式字符串 (e.g., '%Y-%m-%d %H:%M:%S')"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
class DateTimeTool(BaseTool):
|
|||
|
|
"""日期时间操作工具(支持时区转换、日期计算等)"""
|
|||
|
|
|
|||
|
|
name: ClassVar[str] = "datetime_tool"
|
|||
|
|
description: ClassVar[str] = """执行日期时间相关操作,包括:
|
|||
|
|
- 获取当前时间
|
|||
|
|
- 时区转换
|
|||
|
|
- 计算日期差
|
|||
|
|
- 日期时间加减
|
|||
|
|
- 格式化日期
|
|||
|
|
使用时必须指定operation参数确定操作类型。"""
|
|||
|
|
args_schema: Type[BaseModel] = DateTimeInput
|
|||
|
|
|
|||
|
|
def _parse_datetime(self, date_string: str, timezone_str: str = "UTC") -> datetime.datetime:
|
|||
|
|
"""解析日期字符串(私有方法)"""
|
|||
|
|
tz = pytz.timezone(timezone_str)
|
|||
|
|
formats = [
|
|||
|
|
"%Y-%m-%d %H:%M:%S",
|
|||
|
|
"%Y-%m-%d %H:%M",
|
|||
|
|
"%Y-%m-%d",
|
|||
|
|
"%Y/%m/%d %H:%M:%S",
|
|||
|
|
"%Y/%m/%d",
|
|||
|
|
"%d/%m/%Y %H:%M:%S",
|
|||
|
|
"%d/%m/%Y",
|
|||
|
|
"%m/%d/%Y %H:%M:%S",
|
|||
|
|
"%m/%d/%Y"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for fmt in formats:
|
|||
|
|
try:
|
|||
|
|
dt = datetime.datetime.strptime(date_string, fmt)
|
|||
|
|
return tz.localize(dt)
|
|||
|
|
except ValueError:
|
|||
|
|
continue
|
|||
|
|
raise ValueError(f"无法解析日期字符串: {date_string}")
|
|||
|
|
|
|||
|
|
def _run(self,
|
|||
|
|
operation: str,
|
|||
|
|
timezone: str = "UTC",
|
|||
|
|
date_string: Optional[str] = None,
|
|||
|
|
target_timezone: Optional[str] = None,
|
|||
|
|
days: int = 0,
|
|||
|
|
hours: int = 0,
|
|||
|
|
format: str = "%Y-%m-%d %H:%M:%S") -> dict:
|
|||
|
|
"""同步执行日期时间操作"""
|
|||
|
|
logger.info(f"执行日期时间操作: {operation}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if operation == "current_time":
|
|||
|
|
tz = pytz.timezone(timezone)
|
|||
|
|
now = datetime.datetime.now(tz)
|
|||
|
|
return {
|
|||
|
|
"status": "success",
|
|||
|
|
"result": {
|
|||
|
|
"formatted": now.strftime(format),
|
|||
|
|
"iso": now.isoformat(),
|
|||
|
|
"timestamp": now.timestamp(),
|
|||
|
|
"timezone": timezone
|
|||
|
|
},
|
|||
|
|
"summary": f"当前时间 ({timezone}): {now.strftime(format)}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
elif operation == "timezone_convert":
|
|||
|
|
if not date_string or not target_timezone:
|
|||
|
|
raise ValueError("必须提供date_string和target_timezone参数")
|
|||
|
|
|
|||
|
|
source_dt = self._parse_datetime(date_string, timezone)
|
|||
|
|
target_dt = source_dt.astimezone(pytz.timezone(target_timezone))
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"status": "success",
|
|||
|
|
"result": {
|
|||
|
|
"source": source_dt.strftime(format),
|
|||
|
|
"target": target_dt.strftime(format),
|
|||
|
|
"source_tz": timezone,
|
|||
|
|
"target_tz": target_timezone
|
|||
|
|
},
|
|||
|
|
"summary": f"时区转换: {source_dt.strftime(format)} → {target_dt.strftime(format)}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
elif operation == "date_diff":
|
|||
|
|
if not date_string:
|
|||
|
|
raise ValueError("必须提供date_string参数")
|
|||
|
|
|
|||
|
|
target_dt = self._parse_datetime(date_string, timezone)
|
|||
|
|
current_dt = datetime.datetime.now(pytz.timezone(timezone))
|
|||
|
|
delta = target_dt - current_dt
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"status": "success",
|
|||
|
|
"result": {
|
|||
|
|
"days": delta.days,
|
|||
|
|
"hours": delta.seconds // 3600,
|
|||
|
|
"total_seconds": delta.total_seconds(),
|
|||
|
|
"is_future": delta.days > 0
|
|||
|
|
},
|
|||
|
|
"summary": f"日期差: {abs(delta.days)}天 {delta.seconds//3600}小时"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
elif operation == "add_time":
|
|||
|
|
base_dt = self._parse_datetime(date_string, timezone) if date_string \
|
|||
|
|
else datetime.datetime.now(pytz.timezone(timezone))
|
|||
|
|
new_dt = base_dt + datetime.timedelta(days=days, hours=hours)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"status": "success",
|
|||
|
|
"result": {
|
|||
|
|
"original": base_dt.strftime(format),
|
|||
|
|
"new": new_dt.strftime(format),
|
|||
|
|
"delta": f"{days}天 {hours}小时"
|
|||
|
|
},
|
|||
|
|
"summary": f"时间计算: {base_dt.strftime(format)} + {days}天 {hours}小时 = {new_dt.strftime(format)}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
elif operation == "format_date":
|
|||
|
|
dt = self._parse_datetime(date_string, timezone) if date_string \
|
|||
|
|
else datetime.datetime.now(pytz.timezone(timezone))
|
|||
|
|
formatted = dt.strftime(format)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"status": "success",
|
|||
|
|
"result": {
|
|||
|
|
"original": dt.isoformat(),
|
|||
|
|
"formatted": formatted
|
|||
|
|
},
|
|||
|
|
"summary": f"格式化结果: {formatted}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
raise ValueError(f"未知操作类型: {operation}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"操作失败: {str(e)}")
|
|||
|
|
return {
|
|||
|
|
"status": "error",
|
|||
|
|
"message": str(e),
|
|||
|
|
"operation": operation
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async def _arun(self, **kwargs):
|
|||
|
|
"""异步执行"""
|
|||
|
|
return self._run(**kwargs)
|