123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- # -*- encoding: utf-8 -*-
- '''
- @File : api.py
- @Time : 2023年06月19日 20:20:58 星期一
- @Author : erma0
- @Version : 0.1
- @Link : https://erma0.cn
- @Desc : 抖音爬虫封装API
- '''
- import os
- import shutil
- import stat
- import threading
- import time
- from copy import deepcopy
- from enum import Enum
- from typing import List, Union
- import ujson as json
- import uvicorn
- from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import FileResponse, UJSONResponse
- from pydantic import BaseModel
- from uvicorn.config import LOGGING_CONFIG
- from browser import Browser
- from spider import Douyin
- date_fmt = "%Y-%m-%d %H:%M:%S"
- LOGGING_CONFIG["formatters"]["access"]["fmt"] = '%(asctime)s %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'
- LOGGING_CONFIG["formatters"]["default"]["fmt"] = "%(asctime)s %(levelprefix)s %(message)s"
- LOGGING_CONFIG["formatters"]["default"]["datefmt"] = date_fmt
- LOGGING_CONFIG["formatters"]["access"]["datefmt"] = date_fmt
- title: str = '抖音爬虫API'
- version: str = "0.1.0"
- update = "20230619"
- author = "erma0"
- web = "https://douyin.erma0.cn/"
- github = "https://github.com/erma0/douyin"
- doc = "https://douyin.erma0.cn/docs"
- description: str = """
- ### ❤️开源不易,欢迎star⭐
- - 支持采集账号主页作品、喜欢作品、音乐原声作品、搜索作品、关注列表、粉丝列表、合集作品、单个作品
- ### 📢声明
- > 本仓库为学习`playwright`爬虫、命令行调用`Aria2`及`FastAPI/AMIS/Eel`实现`WEBUI`的案例,仅用于测试和学习研究,禁止用于商业用途或任何非法用途。
- > 任何用户直接或间接使用、传播本仓库内容时责任自负,本仓库的贡献者不对该等行为产生的任何后果负责。
- > 如果相关方认为该项目的代码可能涉嫌侵犯其权利,请及时通知删除相关代码。
- """
- contact = {"author": author, "url": github}
- tags_metadata = [
- {
- "name": "同步",
- "description": "耗时短的接口,直接返回结果",
- },
- {
- "name": "异步",
- "description": "耗时长的接口,无法直接返回结果,返回一个轮询接口",
- },
- ]
- edge = None
- running_ls = []
- download_path = './下载_API'
- app = FastAPI(
- title=title,
- description=description,
- version=version,
- openapi_tags=tags_metadata,
- contact=contact,
- )
- # 允许跨域
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_methods=["*"],
- allow_headers=["*"],
- )
- class DouyinAPI(Douyin):
- def _append_awemes(self, aweme_list: List[dict]):
- super()._append_awemes(aweme_list)
- with open(f"{download_path}/{self.type}_{self.id}.json", 'w', encoding='utf-8') as f:
- json.dump({'code': 1, 'num': len(self.results)}, f, ensure_ascii=False)
- def _append_users(self, user_list: List[dict]):
- super()._append_users(user_list)
- with open(f"{download_path}/{self.type}_{self.id}.json", 'w', encoding='utf-8') as f:
- json.dump({'code': 1, 'num': len(self.results)}, f, ensure_ascii=False)
- def running(a: DouyinAPI, target: str):
- # a.aria2_conf = f"{download_path}/{a.type}_{a.id}.txt"
- a.run()
- with open(f"{download_path}/{a.type}_{a.id}.json", 'w', encoding='utf-8') as f:
- json.dump({
- 'code': 0,
- 'num': len(a.results),
- 'data': a.results,
- }, f, ensure_ascii=False)
- running_ls.remove(target)
- class API(BaseModel):
- Version: str = version
- Update: str = update
- Web: str = web
- GitHub: str = github
- Doc: str = doc
- Time: str = time.ctime()
- class DataVideo(BaseModel):
- id: str
- desc: str
- download_addr: Union[str, List[str]]
- time: int = None
- digg_count: int = None
- share_count: int = None
- collect_count: int = None
- comment_count: int = None
- diggCount: int = None
- shareCount: int = None
- collectCount: int = None
- commentCount: int = None
- liveWatchCount: int = None
- music_title: str = None
- music_url: str = None
- cover: str = None
- tags: List[dict] = None
- class DataUser(BaseModel):
- nickname: str
- sec_uid: str
- uid: str = None
- signature: str = None
- avatar: str = None
- short_id: str = None
- unique_id: str = None
- unique_id_modify_time: int = None
- aweme_count: int = None
- favoriting_count: int = None
- follower_count: int = None
- following_count: int = None
- constellation: int = None
- create_time: int = None
- enterprise_verify_reason: str = None
- is_gov_media_vip: bool = None
- total_favorited: int = None
- share_qrcode_uri: str = None
- class DataSync(BaseModel):
- code: int = 0 # 0 已完成;1 正在运行
- num: int = 0
- data: List[Union[DataVideo, DataUser]] = None
- class DataAsync(BaseModel):
- code: int = 0 # 0 成功投递;1 正在运行;
- url: str
- class TypeAsync(str, Enum):
- post = 'post'
- like = 'like'
- favorite = 'favorite'
- music = 'music'
- search = 'search'
- collection = 'collection'
- follow = 'follow'
- fans = 'fans'
- @app.get(
- "/",
- response_class=UJSONResponse,
- response_model=API,
- )
- def api(req: Request):
- return {'Web': req.base_url._url, 'Doc': f'{req.base_url._url}docs'}
- @app.get(
- "/api/video",
- response_class=UJSONResponse,
- response_model=DataVideo,
- tags=['同步'],
- response_model_exclude_unset=True,
- response_model_exclude_defaults=True,
- )
- def get_video(url: str):
- start_browser()
- a = DouyinAPI(context=edge.context, url=url, type='video', down_path=download_path)
- a.run()
- return deepcopy(a.results[0])
- @app.get(
- "/api/user",
- response_class=UJSONResponse,
- response_model=DataUser,
- tags=['同步'],
- response_model_exclude_unset=True,
- response_model_exclude_defaults=True,
- )
- def get_user(id: str):
- start_browser()
- a = DouyinAPI(context=edge.context, url=id, type='id', down_path=download_path)
- a.run()
- return deepcopy(a.results[0])
- @app.get("/api/info", response_class=UJSONResponse, tags=['同步'])
- def get_info(url: str):
- start_browser()
- a = DouyinAPI(context=edge.context, url=url, down_path=download_path)
- a.has_more = False
- a.run()
- return deepcopy(a.info)
- @app.get(
- "/api/{type_async}",
- response_class=UJSONResponse,
- response_model=DataAsync,
- tags=['异步'],
- status_code=201,
- )
- def start_async(type_async: TypeAsync, url: str, background_tasks: BackgroundTasks, req: Request):
- start_browser()
- target = f"{type_async.value}_{url.strip()}"
- a = DouyinAPI(context=edge.context, url=url, type=type_async.value, down_path=download_path)
- if target in running_ls:
- code = 1
- # return RedirectResponse(url=f"{req.base_url._url}/api/{type_async.value}/{a.id}", status_code=303)
- else:
- code = 0
- running_ls.append(target)
- background_tasks.add_task(running, a, target)
- return {'code': code, 'url': f"{req.base_url._url}api/{type_async.value}/{a.id}"}
- @app.get(
- "/api/{type_async}/{id}",
- response_class=UJSONResponse,
- response_model=DataSync,
- tags=['同步'],
- response_model_exclude_unset=True,
- response_model_exclude_defaults=True,
- )
- def async_result(type_async: TypeAsync, id: str, down: bool = False):
- suffix = 'txt' if down else 'json'
- file = f"{download_path}/{type_async.value}_{id}.{suffix}"
- if not os.path.exists(file):
- raise HTTPException(status_code=404, detail="目标不存在")
- return FileResponse(file)
- @app.get("/t", response_class=UJSONResponse)
- def tt():
- print('start')
- time.sleep(3)
- print('end')
- return {"url": "type"}
- @app.get("/init")
- def start_browser():
- global edge
- if not isinstance(edge, Browser):
- edge = Browser(headless=False)
- # 删除只读文件夹
- def remove_readonly(func, path, _):
- os.chmod(path, stat.S_IWRITE)
- func(path)
- # 每10分钟清理一次下载文件夹,删除1小时以前的记录
- def clean_download_path():
- root = download_path
- if not os.path.exists(root): os.makedirs(root)
- while True:
- now = time.time()
- for file in os.listdir(root):
- file_path = os.path.join(root, file)
- if now - os.path.getmtime(file_path) > 60 * 60 * 1:
- if os.path.isfile(file_path):
- os.remove(file_path)
- else:
- shutil.rmtree(file_path, onerror=remove_readonly)
- time.sleep(60 * 10)
- # 启动后执行
- @app.on_event("startup")
- def startup():
- threading.Thread(target=clean_download_path, daemon=True).start()
- if __name__ == '__main__':
- # uvicorn api:app --host '0.0.0.0' --port 567 --reload
- # uvicorn.run("api:app", host='0.0.0.0', port=567, reload=True, limit_concurrency=5, use_colors=True)
- uvicorn.run("api:app", host='0.0.0.0', port=567, limit_concurrency=5, use_colors=True)
|