知乎环境清理脚本-Python 3

缘起

书接上文:《纷争世事无穷尽》,昨晚写了简单的脚本,先在本机运行几天,排除各类Bug后再挂到SAE。

由于网址总数不多(一万七千左右),没用Bloom过滤器,直接使用txt保存+set查重复。SAE上貌似不能直接创建文件,得改成使用数据库。另外SAE只支持Python 2.7,还得改一些语法。


说明

功能简介

忽略已存在的提问,仅扫描并举报新出现的提问。为免爬得太快被封,仅使用单线程。

脚本中一共有三个类:

  • ZhiHuClient:登录类,维护一个登录知乎的session。
    首次使用时需要调用ZhiHuClient().login(username, password)方法登录并生成Cookies,人肉识别验证码。以后就可以直接用Cookies登录了。
  • Topics:待扫描的话题类。
  • Scanner:扫描和举报的工具类,脚本入口是scanning()方法。
    首次使用时需调用firstScanning()方法抓取已存在的问题网址。

完成初始化后,每次启动脚本只需一行代码:

Scanner().scanning()

输出如下:

==================================================
检测到cookie文件,直接使用cookie登录
已登陆账号: ........
已读取 checkedURLs 16931 个
开始扫描
已举报: 壬申、癸卯、戊申、乙卯,男,求先生看看事业……? 2015-12-16 12:03:07
已举报: 请帮忙看看这个八字,地支三合火,水为用神,又要走伤官大运 有点怕? 2015-12-16 12:22:47
已举报: 请各位高手随缘看一下婚姻和事业,以后发展怎么样? 2015-12-16 12:39:16
已举报: 能不能解释我做过的一些能预示未来的梦? 2015-12-16 12:46:29
已举报: 有没有算命的骗子在街上误拉住一位易学大师算命,反倒被大师算得跪地求解的事件? 2015-12-16 12:53:41
已举报: 老婆要生了,求个宝宝的名字,姓于,多谢? 2015-12-16 13:39:11
已举报: 麻烦大师帮忙看看手相? 2015-12-16 13:49:33
已举报: 求大神帮忙想个老人居住的地方的名字,复古,自然,朴实,有活力的? 2015-12-16 13:55:46
已举报: 从命理学角度看,如梵高这般死后才被世人承认而获得成就的人生可以被推断出来吗? 2015-12-16 14:10:14
已举报: 求算子嗣? 2015-12-16 14:21:35

每天不定时会集中收到私信通知处理结果,在https://www.zhihu.com/community/report可以看到本账号举报结果汇总:
举报结果.jpg
脚本跑了一天半,举报了120个左右的新问题,关闭率在95%以上。从图上最后一列可以发现,虽然脚本一律以「针对具体病情的求药问药」为理由举报所有问题,但是管理员接到举报,自己也会判断属于哪一类。所以不再需要在脚本中加入判断逻辑。

改进空间

  1. Scanner类的canReport()方法用于检测一个问题是否符合举报条件;getReason()方法判断举报原因。这两个规则还没开始写,目前一律以「针对具体病情的求药问药」为理由举报所有问题。管理员自己会判断
  2. 知乎管理员会私信回复举报结果。添加读取和保存举报结果的功能。
  3. 添加判断和举报回答的功能。
  4. 保存网址改成保存到数据库
  5. 改成Python 2.7 的语法
  6. 知乎崩溃时,程序也会报错。

ChangeLog

  • 2015年12月17日,每个话题下所有问题的网址保存在单独的文件中
  • 2015年12月14日,扫描和举报相关话题下所有新问题。

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
#!/usr/bin/env python
# -*- coding: utf-8 -*-



__author__ = "loveNight"
__version__ = "beta 2.0"




import requests
import time
import json
import os
import re
import sys
import datetime
import subprocess
from bs4 import BeautifulSoup as BS





class ZhiHuClient(object):

"""连接知乎的工具类,维护一个Session
2015.11.11

用法:

client = ZhiHuClient()

# 第一次使用时需要调用此方法登录一次,生成cookie文件
# 以后可以跳过这一步
client.login("username", "password")

# 输出登录后的用户名
soup = BS(client.open(r"http://www.zhihu.com/").text, "lxml")
print(soup.find("span", class_="name").getText())

# 输出登录后的网页
print(client.open(r"http://www.zhihu.com/").text.encode("gbk", "ignore").decode("gbk"))

# 用这个session进行其他网络操作,详见requests库
session = client.getSession()
"""


# 网址参数是账号类型
TYPE_PHONE_NUM = "phone_num"
TYPE_EMAIL = "email"
loginURL = r"http://www.zhihu.com/login/{0}"
homeURL = r"http://www.zhihu.com"
captchaURL = r"http://www.zhihu.com/captcha.gif"

headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate",
"Host": "www.zhihu.com",
"Upgrade-Insecure-Requests": "1",
}

captchaFile = os.path.join(sys.path[0], "captcha.gif")
cookieFile = os.path.join(sys.path[0], "cookie")

def __init__(self):
os.chdir(sys.path[0]) # 设置脚本所在目录为当前工作目录

self.__session = requests.Session()
self.__session.headers = self.headers # 用self调用类变量是防止将来类改名
# 若已经有 cookie 则直接登录
self.__cookie = self.__loadCookie()
if self.__cookie:
print("检测到cookie文件,直接使用cookie登录")
self.__session.cookies.update(self.__cookie)
soup = BS(self.open(r"http://www.zhihu.com/").text, "lxml")
print("已登陆账号: %s" % soup.find("span", class_="name").getText())
else:
print("没有找到cookie文件,请调用login方法登录一次!")

# 登录
def login(self, username, password):
"""
验证码错误返回:
{'errcode': 1991829, 'r': 1, 'data': {'captcha': '请提交正确的验证码 :('}, 'msg': '请提交正确的验证码 :('}
登录成功返回:
{'r': 0, 'msg': '登陆成功'}
"""

self.__username = username
self.__password = password
self.__loginURL = self.loginURL.format(self.__getUsernameType())
# 随便开个网页,获取登陆所需的_xsrf
html = self.open(self.homeURL).text
soup = BS(html, "lxml") # 需要第三方库lxml,也可以用标准库中的html.parser
_xsrf = soup.find("input", {"name": "_xsrf"})["value"]
# 下载验证码图片
while True:
captcha = self.open(self.captchaURL).content
with open(self.captchaFile, "wb") as output:
output.write(captcha)
# 人眼识别
print("=" * 50)
print("已打开验证码图片,请识别!")
subprocess.call(self.captchaFile, shell=True)
captcha = input("请输入验证码:")
os.remove(self.captchaFile)
# 发送POST请求
data = {
"_xsrf": _xsrf,
"password": self.__password,
"remember_me": "true",
self.__getUsernameType(): self.__username,
"captcha": captcha
}
res = self.__session.post(self.__loginURL, data=data)
print("=" * 50)
# print(res.text) # 输出脚本信息,调试用
if res.json()["r"] == 0:
print("登录成功")
self.__saveCookie()
break
else:
print("登录失败")
print("错误信息 --->", res.json()["msg"])

def __getUsernameType(self):
"""判断用户名类型
经测试,网页的判断规则是纯数字为phone_num,其他为email
"""

if self.__username.isdigit():
return self.TYPE_PHONE_NUM
return self.TYPE_EMAIL

def __saveCookie(self):
"""cookies 序列化到文件
即把dict对象转化成字符串保存
"""

with open(self.cookieFile, "w") as output:
cookies = self.__session.cookies.get_dict()
json.dump(cookies, output)
print("=" * 50)
print("已在同目录下生成cookie文件:", self.cookieFile)

def __loadCookie(self):
"""读取cookie文件,返回反序列化后的dict对象,没有则返回None"""
if os.path.exists(self.cookieFile):
print("=" * 50)
with open(self.cookieFile, "r") as f:
cookie = json.load(f)
return cookie
return None

def open(self, url, delay=0, timeout=10):
"""打开网页,返回Response对象"""
if delay:
time.sleep(delay)
return self.__session.get(url, timeout=timeout)

def getSession(self):
return self.__session


class Topics(object):

"""待扫描的话题类,从topics.json文件中读取

用法:
t = Topics()
topics = t.topics
topics.update(dict2)
t.save2File
# 也可以这样更新
t.save2File(dict2)
"""


def __init__(self, filename="topics.json"):
self.wholeFilename = os.path.join(sys.path[0], filename)
with open(self.wholeFilename, 'r', encoding='utf-8') as f:
try:
self.topics = json.load(f)
except Exception:
# 如果文件为空或出错,则重新初始化
self.topics = {
19699134:"术数",
19564692:"占卜",
19659169:"玄学",
19633456:"算卦",
}
self.save2File()

def save2File(self, topic_dicts=None):
if topic_dicts:
self.topics.update(topic_dicts)
with open(self.wholeFilename, 'w', encoding='utf-8') as f:
json.dump(self.topics, f)


class Question(object):
"""问题类"""
def __init__(self):
pass





class Scanner(object):

"""扫描类,用于执行扫描操作
由于数据量不大,直接把某话题下所有的问题都存进txt文件,不使用布隆过滤器
"""

re_questionId = re.compile(r'data-id="(\d+)"')
re_xsrf = re.compile(r'name="_xsrf"\s+value="([^"]*?)"')
re_maxPage = re.compile(r'href="\?page=(\d+)"')

reportURL = r'https://www.zhihu.com/report'
questionListUrl = r'https://www.zhihu.com/topic/{topicId}/questions?page={page}'
URL_PREFIX = r'https://www.zhihu.com'
reportResultUrl = r'https://www.zhihu.com/community/report?page={page}'

def __init__(self):
client = ZhiHuClient()
self.session = client.getSession()
self.open = client.open
self.post = self.session.post
self.topics = Topics().topics
self.loadCheckedURLs()



def scanning(self):
"""持续扫描"""
print("开始扫描")
while True:
for topicId in self.topics.keys():
page = 1
flag = True
while flag:
url = self.questionListUrl.format(topicId = topicId, page=page)
html = self.open(url).text
soup = BS(html, "html.parser")
tags = soup("a", class_="question_link")
questionURLs = [self.URL_PREFIX + x["href"] for x in tags]
for x in questionURLs:
if x in self.topicCheckedURLs[topicId]:
flag = False
break
else:
self.topicCheckedURLs[topicId].add(x)
if not x in self.allCheckedURLs:
self.reportQuestion(x)
self.allCheckedURLs.add(x)
page += 1
# 写入文件
self.saveCheckedURLs()
time.sleep(60) # 间隔一分钟



def reportQuestion(self, questionURL):
"""检查问题,符合条件则举报"""
html = self.open(questionURL).text
if self.canReport(html):
postData = self.getPostData(html)
res = self.post(self.reportURL, data = postData, timeout = 15)
print("已举报: %s %s" % (BS(html,"html.parser").find("title").getText().replace(r"- 知乎","").strip(), str(datetime.datetime.now())[:-7]))
return res.json()["msg"]


def getPostData(self, html):
"""构造举报用的Post请求
"""

questionId = self.re_questionId.findall(html)[0]
xsrf = self.re_xsrf.findall(html)[0]
postData = {
"type":"question",
"id":questionId,
"reason":self.getReason(html),
"_xsrf":xsrf
}
return postData



def getReason(self, html):
"""从问题描述中判断举报原因

暂时一律使用「代为完成的个人任务」,慢慢改进算法
原因代号:
10005 问题表意不明
10006 代为完成的个人任务
10007 投票、征集类问题
10008 包含主观个人判断
10009 包含未经证实的传言
10010 针对具体病情的求药问药
"""

return "10010"


def canReport(self, html):
"""符合举报条件
以后慢慢改进算法,目前一律返回True
"""

return True



def firstScanning(self):
"""脚本首次运行时扫描一次,找出扫描话题下所有已经存在的问题"""

with open(r'F:\hexo\知乎清理脚本\Scanned.txt','r',encoding='utf-8') as f:
self.allCheckedURLs = set(f.read().strip().split('\n'))
for topicId in self.topics.keys():
self.topicCheckedURLs[topicId] = set(self.getQuestionUrls(topicId))

# 临时
tmp = [url for url in self.topicCheckedURLs[topicId] if not url in self.allCheckedURLs]
for x in tmp:
try:
self.reportQuestion(x)
except Exception:
print("举报失败:", x)

self.saveCheckedURLs()
print("第一次扫描完成!")



def saveCheckedURLs(self):
"""保存已扫描URL
SAE上貌似不能直接保存文件,需要使用数据库,届时再改,先在本机运行几天
"""

for topicId, file in self.scannedUrlFiles.items():
with open(file, 'w', encoding='utf-8') as f:
f.write("\n".join(self.topicCheckedURLs[topicId]))

def loadCheckedURLs(self):
"""读取已扫描URL
不同话题下的问题互相交叉
"""

self.scannedUrlFiles = {topicId:os.path.join(sys.path[0], topicName + "Scanned.txt") for topicId,topicName in self.topics.items()}
self.topicCheckedURLs = dict()
self.allCheckedURLs = set()
for topicId,file in self.scannedUrlFiles.items():
if os.path.isfile(file):
with open(file, 'r', encoding = 'utf-8') as f:
self.topicCheckedURLs[topicId] = set(f.read().strip().split('\n'))
self.allCheckedURLs |= self.topicCheckedURLs[topicId]
print("已读取 %s 话题 checkedURLs %s 个" % (self.topics[topicId],len(self.topicCheckedURLs[topicId])))
else:
self.topicCheckedURLs[topicId] = set()
print("载入完毕,一共有不重复 checkedURLs %s 个" % len(self.allCheckedURLs))


def getQuestionUrls(self, topicId):
"""返回指定话题下所有的问题URL列表
为免爬得太快被封,使用单线程
"""

urls = []
# 获取最大页数
url = self.questionListUrl.format(topicId = topicId, page = 1)
maxPage = self.getMaxPageNum(url)
pageURLs = (self.questionListUrl.format(topicId = topicId, page = x) for x in range(1, maxPage+1))
for url in pageURLs:
print("正在扫描 %s 话题下的网址: %s" %(self.topics[topicId], url))
time.sleep(0.3)
try:
html = self.open(url).text
except Exception:
continue
soup = BS(html, "html.parser")
tags = soup("a", class_="question_link")
questionURLs = [self.URL_PREFIX + x["href"] for x in tags]
urls.extend(questionURLs)
return urls

def getMaxPageNum(self, url):
"""提取网页上的最大页码数"""
pageNums = [int(x) for x in self.re_maxPage.findall(self.open(url).text)]
return max(pageNums)

def getAllReportResult(self):
"""读取所有举报处理结果"""
url = self.reportResultUrl.format(page = 1)
maxPage = self.getMaxPageNum(url)
pageURLs = (self.reportResultUrl.format(page = x) for x in range(1,maxPage+1))




if __name__ == '__main__':
# client = ZhiHuClient()

# 第一次使用时需要调用此方法登录一次,生成cookie文件
# 以后可以跳过这一步
# client.login("username", "password")



s = Scanner()
# # 首次运行脚本时初始化,将已有话题的网址存到文件中。
# s.firstScanning()
s = s.scanning()


# 增加新话题
# t = Topics()
# topics = t.topics
# topics[19633456] = "算卦"
# t.save2File()
loveNight wechat
我的微信公众号,放一些有趣的内容,不定期更新。