py 多线程处理表格demo
import pandas as pd
import os
import concurrent.futures # 用于并发请求
from time import sleep
from openai import OpenAI # 导入OpenAI库,确保安装了openai库: pip install openai
# 设置DeepSeek API的客户端
client = OpenAI(
base_url="https://chatapi.littlewheat.com/v1",
api_key="sk-xxx" # 替换为你的DeepSeek API Key
)
# 读取CSV文件(标题列和内容列)
input_file = 'process_products.csv' # 请替换为你的文件路径
df = pd.read_csv(input_file)
# 初始化存储优化后的列数据
optimized_titles = [None] * len(df)
supplemented_contents = [None] * len(df)
# 定义函数,通过API优化标题
def optimize_title(title, content, retries=1):
try:
print(f"Optimizing title for: {title[:30]}...") # 打印前30字符的标题,避免日志过长
completion = client.chat.completions.create(
model="gpt-4o", # 模型名
messages=[
{"role": "system", "content": "按照提供的现有标题,根据谷歌SEO来帮我重新优化标题,直接给出结果,不要有其他多余的解释"},
{"role": "user", "content": f"标题是: '{title}', 内容是: '{content}', 回复我优化后的标题,直接给出标题,不要有其他多余的解释"}
],
temperature=0.7,
)
optimized_title = completion.choices[0].message.content.strip()
return optimized_title
except Exception as e:
print(f"Error optimizing title: {e}")
if retries > 0:
print("Retrying title optimization...")
sleep(1) # 暂停1秒后重试
return optimize_title(title, content, retries - 1)
return title # 返回原始标题
# 定义函数,通过API补充内容
def supplement_content(title, content, retries=1):
try:
print(f"Supplementing content for: {title[:30]}...") # 打印前30字符的标题,避免日志过长
completion = client.chat.completions.create(
model="gpt-4o", # 模型名
messages=[
{"role": "system", "content": "按照提供的内容,在提供的原始内容后面帮我补充一些内容,切记不要改动原始内容,返回直接输出优化后的内容,不要多余的解释"},
{"role": "user", "content": f"标题是: '{title}', 内容是: '{content}', 提供的原始内容后面帮我补充一些内容,切记不要改动原始内容,直接给出补充内容,不要有其他多余的解释"}
],
temperature=0.7,
)
supplemented_content = completion.choices[0].message.content.strip()
return supplemented_content
except Exception as e:
print(f"Error supplementing content: {e}")
if retries > 0:
print("Retrying content supplementation...")
sleep(1) # 暂停1秒后重试
return supplement_content(title, content, retries - 1)
return content # 返回原始内容
# 定义并发任务的处理
def process_row(index, title, content):
optimized_title = optimize_title(title, content)
supplemented_content = supplement_content(title, content)
return index, optimized_title, supplemented_content
# 使用并发请求处理每一行数据
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {}
for index, row in df.iterrows():
title = row['title']
content = row['content']
# 提交任务到线程池
futures[executor.submit(process_row, index, title, content)] = index
# 获取结果并更新到列表
for future in concurrent.futures.as_completed(futures):
index = futures[future]
try:
i, optimized_title, supplemented_content = future.result()
optimized_titles[i] = optimized_title
supplemented_contents[i] = supplemented_content
# 打印进度
print(f"Processed row {i+1}/{len(df)}: {optimized_title[:30]}...")
except Exception as e:
print(f"Error processing row {index}: {e}")
# 将优化后的结果添加到DataFrame中
df['optimized_title'] = optimized_titles # 第三列存储优化后的标题
df['supplemented_content'] = supplemented_contents # 第四列存储补充的内容
# 定义输出文件路径
output_file = 'optimized_output.csv'
# 清空输出文件(如果文件存在)
if os.path.exists(output_file):
open(output_file, 'w').close()
# 保存优化后的结果到一个新的CSV文件
df.to_csv(output_file, index=False, encoding='utf-8-sig') # 确保编码兼容中文
print(f"Processing complete. Results saved to '{output_file}'.")
`
修改于 2025-03-28 01:20:10