BONE-API
  1. BONE API/OPEN AI通用接口
BONE-API
  • 发出请求
  • 完成对象
  • 身份验证
  • 导言
  • BONE API/OPEN AI通用接口
    • o1正式版模型独特
    • 批量处理图片示例
    • 创建聊天补全
      POST
    • 创建图像
      POST
  • 模型(Models)
    • 模型介绍
    • 列出模型
      GET
    • 检索模型
      GET
  • 嵌入(Embeddings)
    • 嵌入对象
    • 创建嵌入
      POST
  • 音频(Audio)
    • 创建语音
      GET
    • 创建转录
      GET
    • 创建翻译
      GET
  • 自动补全(Completions)
    • 完成对象
    • 创建完成
      POST
  • 系统其他接口文档
    • 调用日志查询
      GET
    • 使用情况查询
      GET
    • 额度和订阅信息查询
      GET
  1. BONE API/OPEN AI通用接口

批量处理图片示例

import os
import base64
import time
import asyncio
import aiohttp
from datetime import datetime
from tqdm import tqdm
from openai import AsyncOpenAI
import logging
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
import io

class ImageProcessor:
    def __init__(self, image_dir, output_file, api_key, base_url, max_workers=10):
        self.image_dir = image_dir
        self.output_file = output_file
        self.api_key = api_key
        self.base_url = base_url
        self.max_workers = max_workers
        self.setup_logging()
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
        
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('processing.log'),
                logging.StreamHandler()
            ]
        )

    def optimize_image(self, image_path, max_size=800):
        """优化图片大小,减少传输数据量"""
        try:
            with Image.open(image_path) as img:
                # 转换为RGB模式
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                
                # 调整图片大小
                if max(img.size) > max_size:
                    ratio = max_size / max(img.size)
                    new_size = tuple(int(dim * ratio) for dim in img.size)
                    img = img.resize(new_size, Image.Resampling.LANCZOS)
                
                # 保存到内存中
                buffer = io.BytesIO()
                img.save(buffer, format='JPEG', quality=85, optimize=True)
                return base64.b64encode(buffer.getvalue()).decode('utf-8')
        except Exception as e:
            logging.error(f"Error optimizing image {image_path}: {str(e)}")
            return None

    async def process_single_image(self, session, image_name):
        try:
            image_path = os.path.join(self.image_dir, image_name)
            
            # 使用线程池处理图片优化
            with ThreadPoolExecutor() as executor:
                base64_image = await asyncio.get_event_loop().run_in_executor(
                    executor, self.optimize_image, image_path
                )
            
            if not base64_image:
                return None

            response = await self.client.chat.completions.create(
                model="gpt-4o-2024-08-06",
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": "识别图片的文字:"},
                            {
                                "type": "image_url",
                                "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"},
                            },
                        ],
                    }
                ],
            )
            
            result = response.choices[0].message.content.replace("*", "").replace("#", "").replace(" ", "")
            return f"文件名: {image_name}\n处理结果: {result}\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'='*80}\n"
            
        except Exception as e:
            logging.error(f"Error processing image {image_name}: {str(e)}")
            return None

    async def process_batch(self, batch):
        async with aiohttp.ClientSession() as session:
            tasks = [self.process_single_image(session, img) for img in batch]
            return await asyncio.gather(*tasks)

    def chunk_list(self, lst, chunk_size):
        """将列表分成更小的批次"""
        return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

    async def process_images(self):
        start_time = time.time()
        
        try:
            images = [f for f in os.listdir(self.image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif'))]
            
            if not images:
                logging.warning(f"No images found in {self.image_dir}")
                return
            
            logging.info(f"Found {len(images)} images to process")
            
            with open(self.output_file, 'a', encoding='utf-8') as f:
                f.write(f"处理开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'='*80}\n")
            
            # 将图片列表分成批次
            batches = self.chunk_list(images, self.max_workers)
            
            with tqdm(total=len(images), desc="Processing images") as pbar:
                for batch in batches:
                    results = await self.process_batch(batch)
                    for result in results:
                        if result:
                            with open(self.output_file, 'a', encoding='utf-8') as f:
                                f.write(result)
                    pbar.update(len(batch))
            
            elapsed_time = time.time() - start_time
            summary = f"\n总处理时间: {elapsed_time:.2f}秒\n处理的图片数量: {len(images)}\n平均每张图片处理时间: {elapsed_time/len(images):.2f}秒\n"
            
            with open(self.output_file, 'a', encoding='utf-8') as f:
                f.write(summary)
            
            logging.info(f"Processing completed. Results saved to {self.output_file}")
            
        except Exception as e:
            logging.error(f"Error during processing: {str(e)}")

async def main():
    processor = ImageProcessor(
        image_dir="images",
        output_file="results.txt",
        api_key="sk-vxxxx",
        base_url="https://open.api.gu28.top/v1",
        max_workers=20  # 增加并发数
    )
    await processor.process_images()

if __name__ == "__main__":
    asyncio.run(main())
上一页
o1正式版模型独特
下一页
创建聊天补全
Built with