User:YFdyh000/alexa ranking zh.py
外观
# -*- coding: utf-8 -*-
# Copyright (C) Osama Khalid 2011. Released under AGPLv3+.
# Please wirte your feedback to [[User_talk:OsamaK]].
# This script updates Alexa rankings depending on a list on
# [[User:OsamaK/AlexaBot.js]]. The syntax of the list is:
# "Example (website) example.com"
# It could optionally include the "local" flag to fetch the local
# Alexa ranking (the one beside the 'Global ranking'):
# "Example (website) example.com local"
# This script Modified at 2012.08.27~2012.09.01 by YFdyh000
# New features: Smart Get the website domain name in the Wikipedia page and more.
# alexa_ranking_zh.py -nocheck -page:
# alexa_ranking_zh.py -catr:網站
# alexa_ranking_zh.py -catr:网站小作品 -simulate
import re
import urllib
import shelve
import time
import config
from datetime import datetime
import wikipedia
import pagegenerators
class alexaBot:
def __init__(self):
#self.database = shelve.open('alexa_rankings_zh.db')
self.now = datetime.now()
self.month_names = [u'1月', u'2月', u'3月', u'4月', u'5月',
u'6月', u'7月', u'8月', u'9月',
u'10月', u'11月', u'12月']
self.site = wikipedia.getSite('zh', 'wikipedia')
self.mybot_username = config.usernames[self.site.fam().name][self.site.language()]
self.weblist = None
self.locallist = None
self.always = False
self.summary = u"[[User:YFdyh000/alexa_ranking_zh.py|机器人]]:更新Alexa排名(试运行)"
self.botflag = True
self.minorEdit = True
self.dry = False
self.generator = None
self.quiet = False
self.detect_reflist = False
self.skip_lastedit_is_my = True
self.checkLastUpdateTime = True
#self.Last_edited_more_than = 0 # 目前参数写死
self.urllibProxyConfig = { 'http': 'http://%s:%s@%s' % ('', '', '127.0.0.1:7117') }
self.showdiff = True
def get_article_list(self):
genFactory = pagegenerators.GeneratorFactory()
for arg in wikipedia.handleArgs():
if arg == '-web':
self.weblist = True
elif arg == '-local':
self.locallist = True
elif arg == '-quiet':
self.quiet = True
elif arg == '-nocheck':
self.skip_lastedit_is_my = False
self.checkLastUpdateTime = False
elif arg == '-diff':
self.showdiff = True
elif arg == '-nodiff':
self.showdiff = False
elif arg == '-noproxy':
self.urllibProxyConfig = {}
#elif arg == '-dry': # -simulate
# self.dry = True
else:
genFactory.handleArg(arg)
if self.weblist:
list_page = wikipedia.Page(self.site,'User:OsamaK/AlexaBot.js').get()
elif self.locallist:
list_page = open('alexa_ranking.list').read().decode("utf-8") # Alternative list source.
if self.weblist or self.locallist:
#list_regex = '"(.+)" ([^ \n]+)[ ]?(local)?'
list_regex = '"(.+)" ?([^ \n]+)?[ ]?(local)?' # 新正则,提供网址可选
articles_list = re.findall(list_regex, list_page, re.M)
else:
if not self.generator:
self.generator = genFactory.getCombinedGenerator()
if not self.generator:
print (u'You have to specify the generator you want to use for the script!')
exit()
preloadingGen = pagegenerators.NamespaceFilterPageGenerator(self.generator, [0], wikipedia.getSite())
articles_list = []
for page in preloadingGen:
#需要增设例外名单
articles_list.append([page.title(), '', u'local']) #默认为所有条目增加当地排名,待改进
#print articles_list #FIXME: REMOVE
return articles_list
def get_alexa_ranking(self, alexa_url, article, old_ranking):
ranking_regex = '([\d,]+)[ \t]+\</div\>\n\<div class="label">Global Rank'
local_ranking_regex = '([\d,]+)[ \t]+\</div\>\n\<div class="label"\>' \
'Rank in\n\<a href=\'[^\']+\' title="([\w ]+)"'
title_regex = '\<title\>(.+)\</title\>'
if not self.quiet:
print "Fetching Alexa rank. Page:", article[0], "\nURL:", alexa_url
errortime=0
while True:
try:
alexa_text = urllib.urlopen(alexa_url,
proxies=self.urllibProxyConfig
).read()
break
except IOError:
print "Error fetching Alexa page. Retyring in 10" \
" seconds."
if errortime>1: #Number of retries
raise IOError
errortime+=1
time.sleep(10)
continue
alexa_ranking = re.findall(ranking_regex, alexa_text)[0]
alexa_title = re.findall(title_regex, alexa_text)[0]
if 'local' in article:
alexa_local_ranking, alexa_local_country = re.findall(
local_ranking_regex, alexa_text)[0]
local_ranking_text = u"<br />{{flagicon|%(country)s}} %(ranking)s" % \
{"country": alexa_local_country,
"ranking": alexa_local_ranking}
#print local_ranking_text
else:
local_ranking_text = ""
new_ranking = int(alexa_ranking.replace(',', ''))
difference = self.find_difference(article[1], new_ranking, old_ranking)
ranking_text = "%(diff)s%(g_ranking)s%(l_ranking)s" % \
{"diff": difference, "g_ranking": alexa_ranking,
"l_ranking": local_ranking_text}
return ranking_text, alexa_title, new_ranking
def find_difference(self, article_url, new_ranking, old_ranking):
#try:
# old_ranking = self.database[article_url] #改进为从条目上读取
#except KeyError: # If the website is newly added.
# old_ranking = 0
if not self.quiet:
print "[New Alexa ranking is", new_ranking, "old was", str(old_ranking)+']'
if old_ranking == 0:
difference = ""
elif old_ranking > new_ranking:
difference = "{{IncreaseNegative}} "
elif old_ranking < new_ranking:
difference = "{{DecreasePositive}} "
elif old_ranking == new_ranking:
difference = "{{Steady}} "
return difference
def save_article(self, article_object, article_text, article_url,
old_alexa_field, new_alexa_field, new_ranking, old_article_text):
article_text = article_text.replace(old_alexa_field, new_alexa_field)
edit_summary = self.summary
if not self.quiet and self.showdiff and not self.always:
wikipedia.showDiff(old_article_text, article_text)
if not self.dry:
if not self.always:
choice = wikipedia.inputChoice(
u'Do you want to accept the changes?',
['Yes', 'No', 'Always', 'Quit'],
['y', 'n', 'a', 'q'], 'N')
if choice == 'a':
self.always = True
elif choice == 'q':
exit()
if self.always or choice == 'y':
try:
# Save the page
#article_object.put(article_text, comment=edit_summary)
page=article_object
page.put(article_text, comment=edit_summary,
minorEdit=self.minorEdit, botflag=self.botflag)
time.sleep(10)
#self.database[article_url] = new_ranking
except wikipedia.LockedPage:
wikipedia.output(u"Page %s is locked; skipping."
% page.title(asLink=True))
except wikipedia.EditConflict:
wikipedia.output(
u'Skipping %s because of edit conflict'
% (page.title()))
except wikipedia.SpamfilterError, error:
wikipedia.output(
u'Cannot change %s because of spam blacklist entry %s'
% (page.title(), error.url))
else:
return True
#article_object.put(article_text, comment=edit_summary)
def allow_bots(text, user):
return not re.search(r'\{\{(nobots|bots\|(allow=none|deny=.*?' + user + r'.*?|optout=all|deny=all))\}\}', text)
def checkLastUpdateTimeFun(self, article, signatureText, minInterval=7*24*60*60, revCountNum=50):
#article.get()
history = article.getVersionHistory(revCount=revCountNum)
for data in history:
try:
#[1/2/3] edit date/time(UTC), user name, edit summary
#if data[2] == self.mybot_username and data[3].lower().find(u'更新Alexa排名') >= 0:
#print data[2], data[3]
#wikipedia.output(data[3])
if (data[3].lower().find(signatureText)) >= 0:
lastupdatetime = data[1] # like 2010-12-04T10:30:53Z
lastupdatetime = datetime.strptime(lastupdatetime, "%Y-%m-%dT%H:%M:%SZ") # UTC time
#print lastupdatetime
foundbotupdate = True
break
except:
print 'error in checkLastUpdateTimeFun!'
else:
foundbotupdate = False # 没有发现机器人更新排名的编辑
if foundbotupdate:
lastupdate_timedelta = datetime.utcnow() - lastupdatetime
if lastupdate_timedelta.total_seconds() < minInterval: # 距上次更新不足时限
#continue
return True #不足时限,返回终止指令
else:
# print 'Ok, > 7 days.'
return False #大于7天,继续
else:
#print 'no found alexa update edit in the page history.'
return False #未发现曾有编辑,继续
def get_old_ranking_value(self, old_alexa_field):
old_ranking_value_regex = "\| *alexa *= *(?:\{\{[^\{\}<>]+\}\})? *([0-9, ]{1,8}).{1,6}?\{\{(?:as of|flagicon|flag icon|flag|flagcountry)"
try:
value_text = re.findall(old_ranking_value_regex, old_alexa_field, re.IGNORECASE)[0] # old ranking value on page
#print value_text
value_num = int(value_text.replace(',', ''))
return value_num
except IndexError:
return 0 # 未找到旧评分
def run(self):
alexa_field_regex = u"\| *alexa *= *.+[\|\n]"
old_ranking_text_regex = u"\| *alexa *= *(.+)[\|\n]" #待改进
#url_field_regex = u"\| *url *= *\[.+?[\|\n]"
url_field_regex = u"^ *\| *url *= *.+?[\|\n]"
reference_regex = u"(class=\'references-small|\<references|" \
u"\{\{(refs|reflist|References|注脚|参表脚|注表脚|脚注ヘルプ|參考資料|参考列表|RefFoot|NoteFoot))"
#print "Fetching articles list.."
articles_list = self.get_article_list()
"""if self.database == {}: # If this is the first time.
if not self.quiet:
print "This seems to be the first time. No difference templete" \
" will be added."
for article in articles_list:
self.database[str(article[1])] = 0
"""
for article in articles_list:
#print article
article_name = article[0]
# article_url = str(article[1])
# alexa_url = "http://www.alexa.com/siteinfo/" + article_url
#print article_name,article_url,alexa_url
article_object = wikipedia.Page(self.site, article_name)
#if not self.quiet:
# print "Fetching %s page on Wikipedia.." % article_name
try:
article_text = article_object.get()
old_article_text = article_text
except wikipedia.NoPage:
print "Page %s does not exist." % article_name
continue
except wikipedia.IsRedirectPage:
article_object = article_object.getRedirectTarget()
article_name = article_object.title()
article_text = article_object.get()
if self.skip_lastedit_is_my and article_object.userName() == self.mybot_username:
#检查最后编辑者是否是本机器人,若是则证明条目不够热门或更新间隔太短不更新
if not self.quiet:
print "last editor of the [[%s]] page is the bot" % article_name
continue
if self.checkLastUpdateTime and self.checkLastUpdateTimeFun(article_object, u"alexa", 7*24*60*60, 50):
#检查页面历史确认是否在7天内有更新过排名,如有则跳过不更新
if not self.quiet:
print "%s less than 7 days from the last update" % article_name
continue
if self.detect_reflist and not re.search(reference_regex, article_text, re.IGNORECASE):
print "No refereence list in", article_name
continue
#print article_name
#print article[1] #提供的网址,若未提供则为空字符串
if not article[1]: # 若未提供网址
try:
url_field_text=re.findall(r'\n *\| *url *= *(.+)', article_text)[0]
article_url=re.findall(r'https?://(([\w-]+\.)+[\w-]+)', url_field_text)[0][0]
except IndexError:
print "Not found url in", article_name
continue
article[1] = str(article_url)
else:
article_url = str(article[1])
alexa_url = "http://www.alexa.com/siteinfo/" + article_url
#print article_url #得到的网址
# If there is no Alexa field, add one under the URL field
# (because the url field is a must for articles about websites)
try:
old_alexa_field = re.findall(alexa_field_regex, article_text, re.IGNORECASE)[0]
if old_alexa_field.lower().find("[botnoedit]") >= 0:
print "Found [botnoedit] flag in %s, Skiping." % article_name
continue
except IndexError:
try:
url_field = re.findall(url_field_regex, article_text, re.IGNORECASE)[0]
except IndexError:
print "No alexa or url fields in", article_name
continue
old_alexa_field = "| alexa = "
article_text = article_text.replace(url_field, \
url_field + old_alexa_field)
try:
#old_field_ranking = re.findall(old_ranking_text_regex, old_alexa_field)[0]
old_field_ranking = re.findall(old_ranking_text_regex, old_alexa_field, re.IGNORECASE)[0]
if old_field_ranking.strip() == '': # alexa is a space 待改进
raise IndexError
#print old_field_ranking
old_ranking_on_page = self.get_old_ranking_value(old_alexa_field) #获取页面上更新前的评分
except IndexError: # If the Alexa field wasn't there or was empty.
old_ranking_on_page = -1 #页面上没有旧评分
try:
ranking_text, alexa_title, new_ranking = self.get_alexa_ranking(
alexa_url, article, old_ranking_on_page)
except IndexError:
print "Couldn't find any ranking data on", alexa_url
continue
except IOError:
print "Couldn't get page on", alexa_url
continue
#print ranking_text, alexa_title, new_ranking, old_ranking_on_page
"""if new_ranking == old_ranking_on_page:
if not self.quiet:
print "Current total rank with old rank did not change, Skiping."
continue
不需要这段,需要去更新日期和持平标志
"""
new_field_ranking = u"%(ranking_text)s ({{as of|%(year)d|%(month)d|%(day)d" \
u"|alt=%(year)d年%(month_name)s}})<ref name=\"alexa\">" \
u"{{cite web|url=%(url)s|title=%(title)s" \
u"|publisher=[[Alexa Internet]]" \
u"|accessdate=%(year)d-%(month)02d-%(day)02d}}</ref>" \
u"<!--Updated by YFdyh-bot.-->" % \
{"ranking_text": ranking_text, "title": alexa_title,
"url": alexa_url, "year": self.now.year,
"month": self.now.month, "day": self.now.day,
"month_name": self.month_names[self.now.month-1]}
# monthly
#if old_ranking_on_page == 0:
if old_ranking_on_page >= 0:
new_alexa_field = old_alexa_field.replace(old_field_ranking, new_field_ranking)
else:
new_alexa_field = old_alexa_field.strip() + " " + new_field_ranking + "\n"
try:
self.save_article(article_object, article_text,
article_url, old_alexa_field,
new_alexa_field, new_ranking, old_article_text)
except wikipedia.IsRedirectPage:
print "Weird error on %s. This shouldn't be a " \
"redirect!" % article_name
continue
#self.database.close()
if __name__ == '__main__':
#print wikipedia.Page('en','User:OsamaK/AlexaBot.js').getLatestEditors()[0]['user']
#article=wikipedia.Page('en','User:OsamaK/AlexaBot.js')
#print alexaBot().checkLastUpdateTimeFun(article, u'a')
#exit()
try:
bot = alexaBot()
try:
bot.run()
except KeyboardInterrupt:
print('\nQuitting program...')
finally:
wikipedia.stopme()