Lofter存档助手
2021/7/13 23:41:33
本文主要是介绍Lofter存档助手,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
声明:该代码仅为个人业余练习产物,供爱好者保存tag下图文到本地作为收藏,不得用于商业用途、侵犯他人著作权。
效果演示见https://www.bilibili.com/video/BV18h411v7kN拖到最后五分钟
一、准备工作
本工具使用PyQt5开发图形界面,这一步不具体展开记录。之后导出ui文件,然后转换为py文件。在命令行输入
pyuic5 download.ui -o download.py
即可完成转换。
图形界面中的windowIcon要单独导出为qrc文件,然后转换为py文件。在命令行输入
pyrcc5 -o logo.py logo.qrc
即可完成转换。
如果频繁访问,会被网易的反爬虫机制制裁,需要事先设定多个User-Agent和ip池来模拟不同用户浏览器(做了这步还是可能会被封ip,我也不知道为什么)。用fake_useragent库异步测试哪些User-Agent可以用来爬Lofter。
# request_headers.py import aiohttp import asyncio from fake_useragent import UserAgent import pickle import time def get_agents(): agent_list = [] n = 0 while n <= 10000: agent = UserAgent().random agent_list.append(agent) n += 1 agent_list = list(set(agent_list)) print(len(agent_list)) return agent_list async def send_request(useragent, url): """请求数据""" async with aiohttp.ClientSession() as session: try: headers = { 'user-agent': useragent } print('正在测试: ', useragent) async with session.get(url=url, headers=headers, timeout=15, verify_ssl=False) as response: print(response.status) if response.status == 200: print('User-agent可用: ', useragent) agent_ok.append(useragent) await asyncio.sleep(1) else: print('请求响应码不合法:', useragent) except: print('请求失败', useragent) async def main(): tasks = [send_request(agent, url) for agent in agents] await asyncio.wait(tasks) if __name__ == '__main__': url = 'https://www.lofter.com/tag/TAG/new?page=1' agents = get_agents() agent_ok = [] print('开始测试: ') try: loop = asyncio.get_event_loop() loop.run_until_complete(main()) except Exception as err: print('发生错误:', err.args) print(agent_ok) with open('./useragent/agent.pickle', 'wb') as f: pickle.dump(agent_ok, f)
ip池的获取与之类似。这里我是参考一位大佬的方法。
回到download.py,主函数:
from sys import exit from random import choice import logo if __name__ == '__main__': agent_list = ['USERAGENT'] # 你的 User-Agent useragent = choice(agent_list) headers = { 'User-Agent': useragent } proxies = { 'http': 'http://XXX.XXX.XXX.X:XXXX' } # 你的 ip 池 app = QApplication([]) download = Download() download.show() exit(app.exec_())
二、现在考虑这个工具需要具备的功能。
以前用过一位大佬制作的存文工具,其文件命名为“作者名+标题”,实际上会漏掉一些标题相同(或没有标题)的文章。发布时间是以时间戳的形式呈现的,命名规则中加上时间能尽可能地减少遗漏。
from time import localtime, strftime def timestamp13_to_date(target_timestamp, date_format='%y%m%d-%H%M'): """ 将毫秒时间戳转换为时间字符串。 :param target_timestamp: 毫秒时间戳(13位数字) :param date_format: 时间字符串格式 :return: 时间字符串 """ timestamp13 = localtime(float(int(target_timestamp) / 1000)) standard_date = strftime(date_format, timestamp13) return standard_date
在爬取过程中,需要将日志以字符串形式显示在QTextBrowser中。自定义一个信号类。
自定义一个Download类。将两个pushButton绑到两个函数上,点击pushButton就能开始执行下载图、文的任务。
from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * class MySignals(QObject): text_print = pyqtSignal(str) class Download(QMainWindow): def __init__(self): super().__init__() # 使用ui文件导入定义界面类 self.ui = Ui_MainWindow() # 初始化界面 self.ui.setupUi(self) self.ui.pushButton.clicked.connect(self.handleCalc) # MySignals()类的实例对象 self.ms = MySignals() # 自定义信号的处理函数 self.ms.text_print.connect(self.printToGui) self.ui.pushButton_2.clicked.connect(self.handleCalc_2) self.ms_2 = MySignals() self.ms_2.text_print.connect(self.printToGui_2) def printToGui(self, text): self.ui.textBrowser.append(text) self.ui.textBrowser.ensureCursorVisible() def printToGui_2(self, text): self.ui.textBrowser_2.append(text) self.ui.textBrowser_2.ensureCursorVisible()
采用Requests库请求数据,xpath和selectolax定位。selectolax定位速度快,可以做到正文内容、排版跟原来一样。Selectolax库的Github地址
爬虫是I/O 密集型任务。线程池官方文档
from threading import Thread from lxml.etree import HTML from time import sleep from os.path import isfile from os import getcwd from requests import get from selectolax.parser import HTMLParser from re import sub from concurrent.futures import ThreadPoolExecutor from random import uniform
图:
def handleCalc(self): def run(): tag = self.ui.plainTextEdit.toPlainText() # tag 名称 end_date = self.ui.dateEdit.date().toString('yyMMdd') # 截止年月日 min_hot = int(self.ui.plainTextEdit_2.toPlainText()) # 筛选热度 shield_tag = self.ui.plainTextEdit_3.toPlainText().split(",") # 屏蔽词,以逗号隔开 self.ms.text_print.emit('正在运行,请稍等') def get_pic(page): # while loop 是为了避免特殊原因爬不到的情况 n = 0 while n == 0: try: r_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(uniform(0, 0.1)) page_html = HTML(r_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: # 是否是图片产出 if p.xpath('./div[2]/div/div[2]/div[2]/div/div/div[@class="img"]'): timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) # 是否满足截止日期 if date[:6] >= end_date: try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' # 是否满足热度 if int(hot) >= min_hot: pic_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') # 是否包含屏蔽词 if [i for i in shield_tag if i in pic_tag]: pass else: url = p.xpath('./div[2]/div/div[1]/a/@href')[0] while n == 0: try: r1 = get(url=url, headers=headers, proxies=proxies) break except: sleep(10) pass html = HTML(r1.text) pic = html.xpath('//*[@imggroup="gal"]/img/@src') name = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", p.xpath('./@data-blognickname')[0]) for k in range(len(pic)): title = f'{date}_{name}_({str(k+1)}).jpg' upath = f'{getcwd()}/{title}' if isfile(upath) is True: self.ms.text_print.emit('已存在:' + title) else: while n == 0: try: image = get(pic[k].split("?")[0]).content break except: sleep(10) pass with open(upath, 'wb') as f: f.write(image) self.ms.text_print.emit(title) else: pass else: return True else: pass # 如果没有下一页按钮,说明最后一页已经爬取完毕 if not HTMLParser(r_text).css_first('span.w-iar2\000r'): return True try: with ThreadPoolExecutor() as pool: for page in range(1, 100000000): future = pool.submit(get_pic, page) if future.result(): break self.ms.text_print.emit('END') # 关闭窗口异常 except RuntimeError: pass t = Thread(target=run) t.setDaemon(True) t.start()
文:
由于文章页存在很多模板,正文可能会存在于各种标签下,我自己手动找到了15种,应该能覆盖99%(
def handleCalc_2(self): def run_2(): tag = self.ui.plainTextEdit_4.toPlainText() end_date = self.ui.dateEdit_2.date().toString('yyMMdd') min_hot = int(self.ui.plainTextEdit_5.toPlainText()) shield_tag = self.ui.plainTextEdit_6.toPlainText().split(",") self.ms_2.text_print.emit('正在运行,请稍等') def get_art(page): n = 0 while n == 0: try: r2_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(uniform(0, 0.1)) page_html = HTML(r2_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: # 是否是文章产出 if p.xpath('./div[2]/div/div[2]/div[2]/div/div[1]/div[@class="txt js-digest ptag"]'): timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) # 是否满足截止日期 if date[:6] >= end_date: try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' # 是否满足热度 if int(hot) >= min_hot: art_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') # 是否包含屏蔽词 if [i for i in shield_tag if i in art_tag]: pass else: # 获取文章标题 try: tit = p.xpath('./div[2]/div/div[2]/div[2]/div/h2/text()')[0] except IndexError: tit = '无题' name = p.xpath('./@data-blognickname')[0] title = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", f'{date}_{name}_{tit}.txt') upath = f'{getcwd()}/{title}' if isfile(upath) is True: self.ms_2.text_print.emit('已存在:' + title) else: url = p.xpath('./div[2]/div/div[1]/a/@href')[0] while n == 0: try: r3 = get(url=url, headers=headers, proxies=proxies) break except: sleep(10) pass # 正文有可能出现的标签 parser = HTMLParser(r3.text.replace('<br />', '\n')) res_1 = parser.css('div[class=content]') res_2 = parser.css('div[class=txtcont]') res_3 = parser.css('div[class=contt]') res_4 = parser.css('div[class=cnt\000box]') res_5 = parser.css('div[class=detail-ct]') res_6 = parser.css('div[class=post-ct]') res_7 = parser.css('div[class=listitm\000regular]') res_8 = parser.css('div[class=ctc\000box]') res_9 = parser.css('div[class=icontent]') res_10 = parser.css('div[class=textc]') res_11 = parser.css('div[class=postdesc]') res_12 = parser.css('div[class=cnttxt]') res_13 = parser.css('div[class=posttext]') res_14 = parser.css('div[class=text]') res_15 = parser.css('div[class=cont]') res = res_1 if res_1 else res_2 if res_2 else res_3 if res_3 else res_4 if res_4 else res_5 if res_5 else res_6 if res_6 else res_7 if res_7 else res_8 if res_8 else res_9 if res_9 else res_10 if res_10 else res_11 if res_11 else res_12 if res_12 else res_13 if res_13 else res_14 if res_14 else res_15 if res_15 else [] content = '' with open(upath, 'a+', encoding='utf-8') as f: f.write(f'热度:{hot} tag:{art_tag} 日期:{date[:6]} 原文链接:{url}\n'.replace(r'\xa0', ' ')) # 原生字符 \xa0 转换为 空格 for j in res: for node in HTMLParser(j.html).css('p'): content += node.text(deep=True, separator='', strip=False) + '\n' # 没有'p'标签的情况 if not content: for node in res: content += node.text(deep=True, separator='', strip=False) + '\n' f.write(content.replace('\n\n\n', '\n\n').replace('\n\n\n\n', '\n\n\n')) self.ms_2.text_print.emit(title) else: pass else: return True else: pass if not HTMLParser(r2_text).css_first('span.w-iar2\000r'): return True try: with ThreadPoolExecutor() as pool: for page in range(1, 100000000): future = pool.submit(get_art, page) if future.result(): break self.ms_2.text_print.emit('END') except RuntimeError: pass t_2 = Thread(target=run_2) t_2.setDaemon(True) t_2.start()
三、不足
还存在几个小问题。一是Lofter网页版本身有时间线混乱的问题,这种情况比较少见,一旦出现会引起严重强迫症。二是。。(改了再说)
四、拓展
一个没什么用的功能,输入多个tag名称、屏蔽tag、起止年月,可以生成一个用来对比这些tag的各月新增趋势的Excel表格。
from pandas import DataFrame, concat
Download类下加上这个函数:
def handleCalc_3(self): def run_3(): top_cp = self.ui.plainTextEdit_7.toPlainText().split(",") shield_tag = self.ui.plainTextEdit_8.toPlainText().split(",") start = self.ui.dateEdit_3.date().toString('yyMM') end = self.ui.dateEdit_4.date().toString('yyMM') def total_tag(cp): n = 0 while n == 0: try: r4 = get(url=f'https://www.lofter.com/tag/{cp}', headers=headers, proxies=proxies) break except: sleep(10) pass html = HTML(r4.text) total = html.xpath('//*[@id="tagpageheader"]/div/div/div[2]/div[1]/div[2]/div[1]/div[2]/text()')[0].split('浏览')[1].split('参与')[0].strip() self.ms_3.text_print.emit('tag总数:' + total) return total def single_cp(cp): hots = [[] for i in res] new = [0 for i in res] n = 0 for page in range(1, 100000): self.ms_3.text_print.emit('page:' + str(page)) while n == 0: try: r5_text = get(url=f'https://www.lofter.com/tag/{cp}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(0.5) page_html = HTML(r5_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') if [i for i in shield_tag if i in tag]: pass else: timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) for j in res: if date[:4] > all_month[0]: break if date[:4] == all_month[j]: self.ms_3.text_print.emit('date:' + date[:4]) try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' hots[j].append(int(hot)) new[j] += 1 break if date[:-4] < all_month[-1]: page = -1 break else: pass if page == -1: break if page == -1: break if not HTMLParser(r5_text).css_first('span.w-iar2\000r'): break max_hot = [] for i in res: if not hots[i] and not new[i]: max_hot.append(0) else: max_hot.append(max(hots[i])) self.ms_3.text_print.emit('时间:' + all_month[i]) self.ms_3.text_print.emit('月度新增:' + str(new[i])) self.ms_3.text_print.emit('最高热度:' + str(max_hot[i])) return new, max_hot def download_history(): total_list = {} dfs = [] data = {} for cp in top_cp: self.ms_3.text_print.emit('统计中,请稍等:' + cp) total_list[cp] = total_tag(cp) new, max_hot = single_cp(cp=cp) data[cp] = [[cp, total_list[cp], new[i], max_hot[i]] for i in res] for i in res: df = DataFrame([list(data.values())[j][i] for j in range(len(data))], columns=['tag名称', 'tag总数', '月度新增', '最高热度']) df.set_index(['tag名称', 'tag总数'], inplace=True) dfs.append(df) frame = concat(dfs, keys=all_month, axis=1) frame.columns.names = ['时间', '数据'] self.ms_3.text_print.emit('result:') self.ms_3.text_print.emit(str(frame)) frame.to_excel('新建 Microsoft Excel 工作表.xlsx') self.ms_3.text_print.emit('已保存至:新建 Microsoft Excel 工作表.xlsx') if int(start) > int(end): self.ms_3.text_print.emit('请重新输入时间') else: year = list(range(int(end[:2]), int(start[:2]) - 1, -1)) month = ['12', '11', '10', '09', '08', '07', '06', '05', '04', '03', '02', '01'] all_month = [str(i) + j for i in year for j in month] all_month = [i for i in all_month if int(start) <= int(i) <= int(end)] res = range(len(all_month)) download_history() self.ms_3.text_print.emit('END') t_3 = Thread(target=run_3) t_3.setDaemon(True) t_3.start()
做出来表头长这样:
这篇关于Lofter存档助手的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16MyBatis-Plus资料入门教程:快速上手指南
- 2024-11-16Mybatis资料入门教程:新手必看指南
- 2024-11-16MyBatis资料详解:新手入门与初级实战指南
- 2024-11-16MyBatisPlus资料:初学者入门指南与实用教程
- 2024-11-16MybatisPlus资料详解:初学者入门指南
- 2024-11-16MyBatisX资料:新手入门与初级教程
- 2024-11-16RESTful接口资料详解:新手入门指南
- 2024-11-16RESTful接口资料详解:新手入门教程
- 2024-11-16Spring Boot资料:新手入门与初级教程
- 2024-11-16Springboot资料:新手入门与初级教程