Python网络数据采集_注释笔记

初见网络爬虫

BeautifulSoap 简介

1
2
3
4
5
6
7
from urllib.request import urlopen
from bs4 import BeautifulSoup

html = urlopen('http://www.smallapping.com')
bsObj = BeautifulSoup(html.read(), features='lxml')
print(bsObj.h1)
print(bsObj.html.body.p)

调用 bsObj.tagName 只能获取页面中的第一个指定的标签。

可靠的网络连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from urllib.request import urlopen
from urllib.error import HTTPError
from bs4 import BeautifulSoup

def getTitle(url):
try:
html = urlopen(url)
except HTTPError:
return None
try:
bsObj = BeautifulSoup(html.read(), 'lxml')
title = bsObj.body.h1
except AttributeError:
return None
return title


title = getTitle('http://www.smallapping.com')
if title is None:
print('Title could not be found')
else:
print(title)

复杂HTML解析

不是一直都要用锤子

在面对埋藏很深或格式不友好的数据时, 千万不要不经思考就写代码,一定要三思而后行:

  • 寻找“打印此页”的链接,或者看看网站有没有 HTML 样式更友好的移动版(把自己的请求头设置成处于移动设备的状态,然后接收网站移动版)。

  • 寻找隐藏在 JavaScript 文件里的信息。要实现这一点,你可能需要查看网页加载的JavaScript 文件。 我曾经要把一个网站上的街道地址(以经度和纬度呈现的)整理成格式整洁的数组时,查看过内嵌谷歌地图的 JavaScript 文件,里面有每个地址的标记点。

  • 虽然网页标题经常会用到,但是这个信息也许可以从网页的 URL 链接里获取。

  • 如果你要找的信息只存在于一个网站上, 别处没有,那你确实是运气不佳。如果不只限于这个网站, 那么你可以找找其他数据源。有没有其他网站也显示了同样的数据?网站上显示的数据是不是从其他网站上抓取后攒出来的?

再端一碗

1
2
3
4
5
6
7
8
9
10
11
from urllib.request import urlopen
from urllib.error import HTTPError
from bs4 import BeautifulSoup


html = urlopen('http://www.pythonscraping.com/pages/warandpeace.html')
bs = BeautifulSoup(html, 'lxml')
nameList = bs.findAll('span', {'class': 'green'})

for name in nameList:
print(name.get_text())

findAll和find

最常用的两个函数:

  1. attributes, recursive, text, limit, keywords)```
    1
    2

    2. ```find(tag, attributes, recursive, text, keywords)
1
bs.findAll(['p', 'span'], {'class': 'green'}, text='the prince', id='text' )

text: 筛选内容与text完全匹配的标签

recursive:设置为 True, findAll 就会查找标签参数的所有子标签,以及子标签的子标签。为 False, findAll 就只查找文档的一级标签

limit: 只用于findAll,前x项,按照网页上的顺序

keywords: 选择特定属性的标签,如id=’text’。冗余功能,完全可以用其他技术替代。findAll(id=”text”),等同于findAll(“”,{“id”:”text”})。偶尔还会出现问题,比如class=”class”就会错误,因为class是关键字

其他BeautifulSoup对象

  • BeautifulSoup对象

  • 标签Tag对象,find和findAll获得

  • NavigableString对象,表示标签里的文字

  • Comment对象,注释标签

导航树

1
2
3
4
5
from urllib.request import urlopen
from bs4 import BeautifulSoup

html = urlopen("http://www.pythonscraping.com/pages/page3.html")
bs = BeautifulSoup(html, 'lxml')
  1. 子标签和后代标签

bsObj.body.h1 选择了 body 标签后代里的第一个 h1 标签,不会去找 body 外面的标签。

字标签:

{'id': 'giftList'}).children```
1
2

后代标签:```bs.find('table', {'id': 'giftList'}).descendants

  1. 兄弟标签

next_siblings 和 previous_siblings,返回标签后面/前面的兄弟标签

返回表格中除标题行以外的所有行:

{'id': 'giftList'}).tr.next_siblings```
1
2

单标签版本:```next_sibling``` 和 ```previous_sibling

  1. 父标签

parent 和 parents

1
2
3
img = bs.find('img', {'src': '../img/gifts/img1.jpg'})
print(img.parent.previous_sibling.get_text())
print(img.parent.previous)

正则表达式

1
a+b{5}(cc)*d?

a至少出现一次。
b重复5次。
c重复任意欧数次。
d可有可无。

正则表达式常用符号:

正则表达式

正则表达式和BeautifulSoup

1
2
3
for img in bs.findAll('img', {'src': re.compile('\.\./img/gifts/img.*\.jpg')}):
print(img.attrs) # 获取所有属性
print(img['src']) # 读取某一个属性

Lambda表达式

1
bs.findAll(lambda tag: len(tag.attrs) == 2)

开始采集

遍历单个域名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from urllib.request import urlopen
from bs4 import BeautifulSoup
from sys import stdout
import datetime
import random
import re

random.seed(datetime.datetime.now())

def getLinks(url):
html = urlopen('http://en.wikipedia.org'+url)
bs = BeautifulSoup(html, 'lxml')
return bs.find('div', id='bodyContent').findAll(
'a', href=re.compile('^/wiki/((?!:).)*$')) # 页面链接的特点


stdout.flush()
links = getLinks('/wiki/Kevin_Bacon')
while len(links) > 0:
newArticle = links[random.randint(0, len(links)-1)].attrs['href']
print(newArticle)
stdout.flush()
links = getLinks(newArticle)

采集整个网站

深网和暗网:

深网和暗网

链接收集和链接去重:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from urllib.request import urlopen
from bs4 import BeautifulSoup
from sys import stdout
import re

pages = set()


def getLinks(url):
global pages

html = urlopen('http://en.wikipedia.org'+url)
bs = BeautifulSoup(html, 'lxml')
for link in bs.findAll('a', href=re.compile('^(/wiki/)')):
href = link['href']
if href is not None:
if href not in pages:
print(href)
stdout.flush()
pages.add(href)
getLinks(href) # 注意递归次数:Python 默认的递归限制(程序递归地自我调用次数)是 1000 次


getLinks('') # 从主页开始

收集整个网站的据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from urllib.request import urlopen
from bs4 import BeautifulSoup
from sys import stdout
import re

pages = set()


def getLinks(url):
global pages

html = urlopen('http://en.wikipedia.org'+url)
bs = BeautifulSoup(html, 'lxml')
try:
print(bs.h1.get_text())
print(bs.find(id="mw-content-text").findAll("p")[0])
print(bs.find(id="ca-edit").find("span").find("a").attrs['href'])
except AttributeError:
print('缺少属性,不过不必担心!')
stdout.flush()

for link in bs.findAll('a', href=re.compile('^(/wiki/)')):
href = link['href']
if href is not None:
if href not in pages:
print('------------------------\n'+href)
stdout.flush()
pages.add(href)
getLinks(href)

getLinks('')

Scrapy

  1. scrapy startproject wikiSpider
  2. 在spiders文件夹下新建articleSpider.py
  3. 在items.py文件中定义新的item

每个Item对象表示网站上的一个页面,可以定义不同的条目(url、content、header、image等),这里只演示收集每页的title字段

1
2
class Article(scrapy.Item):
title = scrapy.Field()
  1. articleSpider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from scrapy.selector import Selector
from scrapy import Spider

from wikiSpider.items import Article


class ArticleSpider(Spider):
name = 'article'
allowed_domains = ['en.wikipedia.org']
start_urls = ['http://en.wikipedia.org/wiki/Main_Page',
'http://en.wikipedia.org/wiki/Python_%28programming_language%29']

custom_settings = { # 爬虫级设置
'LOG_LEVEL': 'INFO', # 只记录Info及以上级别
}

def parse(self, response):
item = Article()
title = response.xpath('//h1/text()')[0].extract()
print('Title is:', title)
item['title'] = title
return item
  1. 启动爬虫

在主目录运行命令,scrapy crawl article

  1. 切换提取信息格式,指定日志文件
1
2
3
scrapy crawl article -s LOG_FILE=wiki.txt -o articles.csv -t csv
scrapy crawl article -o articles.json -t json
scrapy crawl article -o articles.xml -t xml

也可以把结果写入文件或数据库中,只要在parse部分增加相应代码即可

使用API

1
2
3
token = "your api key"
webRequest = urllib.request.Request('http://myapi.com',headers={'token':token})
html = urlopen(webRequest)

解析json数据:

1
2
3
4
5
6
7
8
9
10
11
12
import json
from urllib.request import urlopen


def getCountry(ipAddress):
response = urlopen('http://ip-api.com/json/' +
ipAddress).read().decode('utf8')
dic = json.loads(response)
return dic['countryCode']


print(getCountry(''))

抓取维基百科的编辑历史的贡献者IP地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from urllib.request import urlopen
from bs4 import BeautifulSoup
from urllib.error import HTTPError
import json
import datetime
import random
import re


random.seed(datetime.datetime.now())


def getLinks(articleUrl):
html = urlopen("http://en.wikipedia.org"+articleUrl)
bs = BeautifulSoup(html, 'lxml')
return bs.find("div", {"id": "bodyContent"}).findAll("a", href=re.compile("^(/wiki/)((?!:).)*$"))


def getHistoryIPs(pageUrl):
# 编辑历史页面URL链接格式是:
# http://en.wikipedia.org/w/index.php?title=Title_in_URL&action=history
pageUrl = pageUrl.replace("/wiki/", "")
historyUrl = "http://en.wikipedia.org/w/index.php?title=" + pageUrl+"&action=history"
print("history url is: "+historyUrl)
html = urlopen(historyUrl)
bs = BeautifulSoup(html, 'lxml')
# 找出class属性是"mw-anonuserlink"的链接
# 它们用IP地址代替用户名
ipAddresses = bs.findAll("a", {"class": "mw-anonuserlink"})
addressList = set()
for ipAddress in ipAddresses:
addressList.add(ipAddress.get_text())
return addressList


def getCountry(ip):
try:
response = urlopen('http://ip-api.com/json/'+ip).read().decode('utf8')
except HTTPError:
return None

responseJson = json.loads(response)
return responseJson['countryCode']


links = getLinks("/wiki/Python_(programming_language)")

while len(links) > 0:
for link in links:
print('------------------')
ips = getHistoryIPs(link['href'])
for ip in ips:
country = getCountry(ip)
print(ip+' is from '+country)

newLink = links[random.randint(0, len(links)-1)].attrs['href']
links = getLinks(newLink)

存储数据

媒体文件

存储媒体文件的两种方式:只获取文件URL链接或直接下载源文件。

只获取媒体文件的URL的优缺点:

媒体文件URL

下载一张图片:

1
2
3
4
5
6
7
8
from urllib.request import urlopen
from urllib.request import urlretrieve
from bs4 import BeautifulSoup

html = urlopen('http://www.pythonscraping.com')
bs = BeautifulSoup(html, 'lxml')
imgUrl = bs.find('a', id='logo').find('img')['src']
urlretrieve(imgUrl, 'logo.jpg')

下载所有具有src属性的资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
from urllib.request import urlopen
from urllib.request import urlretrieve
from bs4 import BeautifulSoup

dire = 'downloaded'
baseUrl = 'http://pythonscraping.com'


def getAbsoluteURL(baseUrl, source):

if source.startswith('http://www.'):
url = 'http://' + source[11:]
elif source.startswith('http://'):
url = source
elif source.startswith('www.'):
url = source[:4]
url = 'http://' + url
else:
url = baseUrl + '/'+source

if baseUrl not in url:
return None
return url.split('?')[0]


def getDownloadPath(baseUrl, absoluteUrl, dire):
path = absoluteUrl.replace('www.', '')
path = path.replace(baseUrl, '')
path = dire + path

dire = os.path.dirname(path)
if not os.path.exists(dire):
os.makedirs(dire)

return path


html = urlopen('http://www.pythonscraping.com')
bs = BeautifulSoup(html, 'lxml')
downloads = bs.findAll(src=True)
for download in downloads:
fileUrl = getAbsoluteURL(baseUrl, download['src'])
if fileUrl is not None:
print(fileUrl)
downPath = getDownloadPath(baseUrl, fileUrl, dire)
urlretrieve(fileUrl, downPath)

把数据存储到CSV

创建csv文件:

1
2
3
4
5
6
7
import csv

with open('test.csv', 'w+') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(('number', 'number plus 2', 'number times 2'))
for i in range(10):
writer.writerow((i, i+2, i*2))

常用场景,获取HTML表格并写入CSV文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import csv
from urllib.request import urlopen
from bs4 import BeautifulSoup

html = urlopen('http://en.wikipedia.org/wiki/Comparison_of_text_editors')
bs = BeautifulSoup(html, 'lxml')
table = bs.find('table', {'class': 'wikitable'})
rows = table.findAll('tr')

with open('editors.csv', 'w+', newline='', encoding='utf8') as csvFile:
writer = csv.writer(csvFile)
for row in rows:
csvRow = []
for cell in row.findAll(['td', 'th']):
csvRow.append(cell.get_text())
writer.writerow(csvRow)

MySQL

1
2
3
4
5
6
7
8
import pymysql
conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock', user='root', passwd=None, db='mysql')
cur = conn.cursor()
cur.execute("USE scraping")
cur.execute("SELECT * FROM pages WHERE id=1")
print(cur.fetchone())
cur.close()
conn.close()

让数据库支持Unicode:

1
2
3
4
ALTER DATABASE scraping CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
ALTER TABLE pages CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE pages CHANGE title title VARCHAR(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE pages CHANGE content content VARCHAR(10000) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

抓取并保存到MySQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from urllib.request import urlopen
from bs4 import BeautifulSoup

import re
import datetime
import random
import pymysql

conn = pymysql.connect(host='127.0.0.1', unix_socket='/tmp/mysql.sock',
user='root', passwd=None, db='mysql', charset='utf8')
cur = conn.cursor()
cur.execute("USE scraping")

random.seed(datetime.datetime.now())


def store(title, content):
cur.execute(
"INSERT INTO pages(title, content) VALUES(\"%s\",\"%s\")", (title, content))
cur.connection.commit()


def getLinks(articleUrl):
html = urlopen("http://en.wikipedia.org"+articleUrl)
bsObj = BeautifulSoup(html)
title = bsObj.find("h1").get_text()
content = bsObj.find("div", {"id": "mw-content-text"}).find("p").get_text()
store(title, content)
return bsObj.find("div", {"id": "bodyContent"}).findAll("a",
href=re.compile("^(/wiki/)((?!:).)*$"))


links = getLinks("/wiki/Kevin_Bacon")
try:
while len(links) > 0:
newArticle = links[random.randint(0, len(links)-1)].attrs["href"]
print(newArticle)
links = getLinks(newArticle)
finally:
cur.close()
conn.close()

email

发邮件:

1
2
3
4
5
6
7
8
9
10
11
12
import smtplib

from email.mime.text import MIMEText

msg = MIMEText('the body of email')
msg['Subject'] = 'an email'
msg['From'] = 'alan@python.com'
msg['To'] = 'some@python.com'

s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()

Python 有两个包可以发送邮件: smtplib 和 email

email 模块里包含了许多实用的邮件格式设置函数,可以用来创建邮件“包裹”。下面的示例中使用的 MIMEText 对象,为底层的 MIME(Multipurpose Internet MailExtensions,多用途互联网邮件扩展类型)协议传输创建了一封空邮件, 最后通过高层的SMTP 协议发送出去。 MIMEText 对象 msg 包括收发邮箱地址、邮件正文和主题, Python 通过它就可以创建一封格式正确的邮件。

smtplib 模块用来设置服务器连接的相关信息。就像 MySQL 服务器的连接一样,这个连接必须在用完之后及时关闭,以避免同时创建太多连接而浪费资源。

封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import smtplib
from email.mime.text import MIMEText
from bs4 import BeautifulSoup
from urllib.request import urlopen
import time


def sendMail(subject, body):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'christmas_alerts@python.com'
msg['To'] = 'alan@python.com'
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()


bs = BeautifulSoup(urlopen("https://isitchristmas.com/"))
while bs.find('a', {'id': 'answer'}).attrs['title'] == '不是':
print('It is not Christmas yet.')
time.sleep(3600)
bs = BeautifulSoup(urlopen("https://isitchristmas.com/"))

sendMail('It\'s Christmax!',
'According to http://itischristmas.com, it is Christmas!')

邮件程序可以做很多事情,可以发送网站访问失败、 应用测试失败的异常情况,也可以在 Amazon 网站上出现了一款卖到断货的畅销品时通知你。

读取文档

纯文本

1
2
3
from urllib.request import urlopen
page = urlopen('https://www.ietf.org/rfc/rfc1149.txt')
print(page.read())

对法语文本进行编码显示:

1
2
3
4
5
6
7
8
9
10
11
12
from urllib.request import urlopen
from bs4 import BeautifulSoup

textPage = urlopen(
"http://www.pythonscraping.com/pages/warandpeace/chapter1-ru.txt")
print(str(textPage.read(), 'utf8'))

html = urlopen("http://en.wikipedia.org/wiki/Python_(programming_language)")
bsObj = BeautifulSoup(html, 'lxml')
content = bsObj.find("div", {"id": "mw-content-text"}).get_text()
content = bytes(content, "UTF-8")
content = content.decode("UTF-8")

CSV

读取网络csv文件:

1
2
3
4
5
6
7
8
9
10
11
from urllib.request import urlopen
from io import StringIO
import csv

data = urlopen(
'http://pythonscraping.com/files/MontyPythonAlbums.csv').read().decode('ascii', 'ignore')

dataFile = StringIO(data)
csvReader = csv.reader(dataFile)
for row in csvReader:
print(row) # row是一个列表,代表每一行(包括列头那一行)
1
2
3
4
5
6
7
8
9
10
11
12
from urllib.request import urlopen
from io import StringIO
import csv

data = urlopen(
'http://pythonscraping.com/files/MontyPythonAlbums.csv').read().decode('ascii', 'ignore')

dataFile = StringIO(data)
dictReader = csv.DictReader(dataFile)
print(dictReader.fieldnames) # ['Name', 'Year']
for row in dictReader:
print(row) # row是字典对象,OrderedDict([('Name', "Monty Python's Flying Circus"), ('Year', '1970')])

PDF

读取PDF文件,使用PDFMiner3K库,过程略。

docx

读取微软Word的.docx文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
from zipfile import ZipFile
from urllib.request import urlopen
from io import BytesIO
from bs4 import BeautifulSoup

wordFile = urlopen("http://pythonscraping.com/pages/AWordDocument.docx").read()
wordFile = BytesIO(wordFile)
document = ZipFile(wordFile)
xml_content = document.read('word/document.xml')
wordObj = BeautifulSoup(xml_content.decode('utf-8'))
textStrings = wordObj.findAll("w:t") # 读取文档
for textElem in textStrings:
print(textElem.text)

数据清洗

编写代码清洗数据

语言模型n-gram:表示文字或语言中的 n 个连续的单词组成的序列。在进行自然语言分析时,使用 n-gram 或者寻找常用词组, 可以很容易地把一句话分解成若干个文字片段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from urllib.request import urlopen
from bs4 import BeautifulSoup
import re
import string


def clearInput(input):
input = re.sub('\n+', ' ', input) # 把换行符(或者多个换行符)替换成空格
input = re.sub('\[\d*\]', '', input) # 去掉维基百科的引用标记
input = re.sub(' +', ' ', input) # 把连续的多个空格替换成一个空格
input = bytes(input, 'utf8')
input = input.decode('ascii', 'ignore') # 把内容转换成 UTF-8 格式以消除转义字符
cleanInput = []
input = input.split()
for item in input:
# 去除两端的标点符号。string.punctuation,标点符号,包括:!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~
item = item.strip(string.punctuation)
if len(item) > 1 or (item.lower() == 'a' or item.lower() == 'i'):
cleanInput.append(item)
return cleanInput


def ngrams(input, n):
input = clearInput(input)
output = []
for i in range(len(input)-n+1):
output.append(input[i:i+n])
return output


html = urlopen('http://en.wikipedia.org/wiki/Python_(programming_language)')
bs = BeautifulSoup(html, 'lxml')
content = bs.find("div", {"id": "mw-content-text"}).get_text()
ngrams = ngrams(content, 2)
print(ngrams)
print("2-grams count is: "+str(len(ngrams)))

数据标准化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def ngrams_dict(input, n):
input = clearInput(input)
output = {}
for i in range(len(input)-n+1):
newNGram = ' '.join(input[i:i+n])
if newNGram in output:
output[newNGram] += 1
else:
output[newNGram] = 1
return output

ngrams = ngrams_dict(content, 2)
ngrams = OrderedDict(sorted(ngrams.items(), key=lambda t: t[1], reverse=False))
print(ngrams)

数据存储后再清洗

使用OpenRefine,过程略。

自然语言处理

概括数据

1
2
3
4
5
content = urlopen(
'http://pythonscraping.com/files/inaugurationSpeech.txt').read().decode('utf8')
ngrams = ngrams_dict(content, 2)
sortedNGrams = sorted(ngrams.items(), key=operator.itemgetter(1), reverse=True)
print(sortedNGrams)

排除常用单词:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def isCommon(ngram):
commonWords = ["the", "be", "and", "of", "a", "in", "to", "have", "it",
"i", "that", "for", "you", "he", "with", "on", "do", "say", "this",
"they", "is", "an", "at", "but", "we", "his", "from", "that", "not",
"by", "she", "or", "as", "what", "go", "their", "can", "who", "get",
"if", "would", "her", "all", "my", "make", "about", "know", "will",
"as", "up", "one", "time", "has", "been", "there", "year", "so",
"think", "when", "which", "them", "some", "me", "people", "take",
"out", "into", "just", "see", "him", "your", "come", "could", "now",
"than", "like", "other", "how", "then", "its", "our", "two", "more",
"these", "want", "way", "look", "first", "also", "new", "because",
"day", "more", "use", "no", "man", "find", "here", "thing", "give",
"many", "well"]
for word in ngram:
if word in commonWords:
return True
return False


def ngrams_dict(input, n):
input = clearInput(input)
output = {}
for i in range(len(input)-n+1):
if not isCommon(input[i:i+n]):
newNGram = ' '.join(input[i:i+n])
if newNGram in output:
output[newNGram] += 1
else:
output[newNGram] = 1
return output

马尔可夫模型

马尔可夫模型:

马尔可夫模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from urllib.request import urlopen
from random import randint


def wordListSum(wordList):
sum = 0
for word, value in wordList.items():
sum += value
return sum


def retrieveRandomWord(wordList):
randIndex = randint(1, wordListSum(wordList))
for word, value in wordList.items():
randIndex -= value
if randIndex <= 0:
return word


def buildWordDict(text):
# 剔除换行符和引号
text = text.replace('\n', ' ')
text = text.replace('\"', '')

# 将标点符号保留在马尔可夫链中
punctuation = [',', '.', ';', ':']
for symbol in punctuation:
text = text.replace(symbol, ' '+symbol+' ')

words = text.split(' ')
words = [word for word in words if word != '']

wordDict = {}
for i in range(1, len(words)):
if words[i-1] not in wordDict:
wordDict[words[i-1]] = {}
if words[i] not in wordDict[words[i-1]]:
wordDict[words[i-1]][words[i]] = 0
wordDict[words[i-1]][words[i]] += 1
return wordDict


text = urlopen(
'http://pythonscraping.com/files/inaugurationSpeech.txt').read().decode('utf8')

wordDict = buildWordDict(text)

length = 1000
chain = ''
currentWord = 'I'
for i in range(0, length):
chain += currentWord + ' '
currentWord = retrieveRandomWord(wordDict[currentWord])
print(chain)

穿越网页表单与登录窗口进行采集

Python Requests

Python 的标准库 urllib 为你提供了大多数 HTTP 功能,但是它的 API 非常差。这是因为它是经过许多年一步步建立起来的——不同时期要面对的是不同的网络环境。于是为了完成最简单的任务,它需要耗费大量的工作(甚至要重写整个方法)。

Requests 库就是这样一个擅长处理那些复杂的 HTTP 请求、 cookie、 header(响应头和请求头)等内容的 Python 第三方库。

提交一个基本表单

1
2
3
4
5
6
7
import requests

params = {'firstname': 'Ryan', 'lastname': 'Mitchell'}
r = requests.post(
'http://pythonscraping.com/pages/files/processing.php', data=params)

print(r.text)

提交文件和图像

1
2
3
4
5
6
7
import requests

files = {'uploadFile': open('1.png', 'rb')}
r = requests.post(
'http://pythonscraping.com/pages/files/processing2.php', files=files)

print(r.text)

处理登录和cookie

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import requests

session = requests.Session()

params = {'username': 'Ryan', 'password': 'password'}
s = session.post(
'http://pythonscraping.com/pages/cookies/welcome.php', params)

print('Cookie is set to:')
print(s.cookies.get_dict())
print('----------------------')
s = session.get('http://pythonscraping.com/pages/cookies/profile.php')
print(s.text)

session对象会持续跟踪会话信息,像cookie、header,甚至包括运行HTTP协议的信息,比如HTTPAdapter(为HTTP何HTTPS的链接会话提供统一接口)

HTTP基本接入认证:

HTTP基本接入认证

1
2
3
4
5
6
7
8
import requests
from requests.auth import AuthBase
from requests.auth import HTTPBasicAuth

auth = HTTPBasicAuth('ryan', 'password')
r = requests.post(
url='http://pythonscraping.com/pages/auth/login.php', auth=auth)
print(r.text)

采集Javascript

在Python中用Selenium执行JavaScript:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import requests
import time

#1 只能采集到加载前的页面
r = requests.get('http://pythonscraping.com/pages/javascript/ajaxDemo.html')
print(r.text)


#2 使用Selenium模拟浏览器打开页面
browser = webdriver.Chrome() # 将chromedriver.exe的路径加入到path中
browser.get('https://www.baidu.com') # 新开一个浏览器

#3 使用Selenium模拟浏览器等待
chrome_options = Options()
chrome_options.add_argument('--headless') # 无界面模式
chrome_options.add_argument('--disable-gpu')
driver = webdriver.Chrome(chrome_options=chrome_options)
driver.get('http://pythonscraping.com/pages/javascript/ajaxDemo.html')
time.sleep(3)
print(driver.find_element_by_id('content').text)
driver.close()

#4 Selenium的通用选择器
driver.find_elements_by_css_selector("#content")

#5 可以搭配BeautifulSoup来解析网页内容
pageSource = driver.page_source
bsObj = BeautifulSoup(pageSource)
print(bsObj.find(id="content").get_text())

隐式等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Chrome()
driver.get('http://pythonscraping.com/pages/javascript/ajaxDemo.html')

try:
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'loadedButton')))
finally:
print(driver.find_element_by_id('content').text)
driver.close()

WebDriverWait 和 expected_conditions,这两个模块组合起来构成了 Selenium 的隐式等待(implicit wait)。元素被触发的期望条件(expected condition)有很多种,包括:

  • 弹出一个提示框

  • 一个元素被选中(比如文本框)

  • 页面的标题改变了,或者某个文字显示在页面上或者某个元素里

  • 一个元素在 DOM 中变成可见的,或者一个元素从 DOM 中消失了

元素用定位器(locator)指定,By对象选择的策略:

  • ID:在上面的例子里用过;通过 HTML 的 id 属性查找元素

  • CLASS_NAME:通过 HTML 的 class 属性来查找元素

  • CSS_SELECTOR:通过 CSS 的 class、 id、 tag 属性名来查找元素,用 #idName、 .className、 tagName 表示

  • LINK_TEXT:通过链接文字查找 HTML 的 标签。例如,如果一个链接的文字是“Next”,就可以用 (By.LINK_TEXT, “Next”) 来选择

  • PARTIAL_LINK_TEXT:与 LINK_TEXT 类似,只是通过部分链接文字来查找

  • NAME:通过 HTML 标签的 name 属性查找。这在处理 HTML 表单时非常方便

  • TAG_NAME:通过 HTML 标签的名称查找

  • XPATH:用 XPath 表达式选择匹配的元素

XPath文档:https://msdn.microsoft.com/zh-cn/zn-CH/enus/library/ms256471

处理重定向

识别一个页面已经完成重定向:从页面开始加载时就“监视 ” DOM 中的一个元素,然后重复调用这个元素直到 Selenium 抛出一个 StaleElementReferenceException 异常。也就是说,元素不在页面的DOM里了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from selenium import webdriver
import time
from selenium.webdriver.remote.webelement import WebElement
from selenium.common.exceptions import StaleElementReferenceException


def waitForLoad(driver):
elem = driver.find_element_by_tag_name('html')
count = 0
while True:
count += 1
if count > 20:
print('Timing out after 10 seconds and returning')
return
time.sleep(.5)
if elem != driver.find_element_by_tag_name('html'):
return
# try:
# elem == driver.find_element_by_tag_name('html')
# except StaleElementReferenceException:
# return


driver = webdriver.Chrome()
driver.get('http://pythonscraping.com/pages/javascript/redirectDemo1.html')
waitForLoad(driver)
print(driver.page_source)

图像识别与文字处理

OCR库

Pillow,Tesseract。

安装:conda install -c simonflueckiger tesserocr pillow

设置训练数据文件路径:setx TESSDATA_PREFIX 'D:\Program Files\Tesseract OCR\'

处理格式规范的文字

图片demo1
图片demo2

tesseract demo.png text:将某个图片的文字识别出来,保存到text.txt文件中

如果图片图片背景有渐变色,文字识别变得困难,可以使用Pillow库创建一个阈值过滤器来去掉渐变的背景色,只把文字流下来,从而利于Tesseract读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from PIL import Image
import subprocess
import os


def cleanFile(filePath, newFilePath):
image = Image.open(filePath)

# 对图片进行阈值过滤,然后保存
image = image.point(lambda x: 0 if x < 120 else 255)
image.save(newFilePath)

# 调用系统的tesseract命令对图片进行OCR
subprocess.call(['tesseract', newFilePath, 'output'])

# 打开文件读取结果
with open('output.txt', 'r') as f:
print(f.read())

cleanFile('demo1.png', 'demo_clean.png')

从网站图片中抓取文字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
from urllib.request import urlretrieve
import subprocess
from selenium import webdriver

# 打开亚马逊《战争与和平》图书详情页
driver = webdriver.Chrome()
driver.get('http://www.amazon.com/War-Peace-Leo-Nikolayevich-Tolstoy/dp/1427030200')
time.sleep(2)

# 单击图书预览按钮
driver.find_element_by_id('sitbLogoImg').click()
imageList = set()

# 等待页面加载完成
time.sleep(5)
# 当向右箭头可以点击时,开始翻页
while 'pointer' in driver.find_element_by_id('sitbReaderRightPageTurner').get_attribute('style'):
driver.find_element_by_id('sitbReaderRightPageTurner').click()
time.sleep(2)
# 获取已加载的新页面(一次可以加载多个页面,但是重复的页面不能加载到集合中)
pages = driver.find_elements_by_xpath("//div[@class='pageImage']/div/img")
for page in pages:
image = page.get_attribute('src')
imageList.add(image)
driver.quit()

# 用Tesseract处理我们收集的图片URL链接
for image in sorted(imageList):
imgName = image[image[0:image.find('?')].rfind('/')+1:image.find('?')]
name = imgName[0:imgName.rfind('.')]
urlretrieve(image, imgName)
p = subprocess.Popen(['tesseract', imgName, name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
f = open(name+'.txt', 'r')

读取验证码与训练Tesseract

首先要把大量的验证码样本下载到一个文件夹里,建议使用验证码的真实结果给每个样本文件命名(即 4MmC3.jpg)。

第二步是准确地告诉 Tesseract 一张图片中的每个字符是什么,以及每个字符的具体位置。这里需要创建一些矩形定位文件(box file),示例:

4 15 26 33 55 0
M 38 13 67 45 0
m 79 15 101 26 0
C 111 33 136 60 0
3 147 17 176 45 0

第一列符号是图片中的每个字符,后面的 4 个数字分别是包围这个字符的最小矩形的坐标(图片左下角是原点 (0,0), 4 个数字分别对应每个字符的左下角 x 坐标、左下角 y 坐标、右上角 x 坐标和右上角 y 坐标),最后一个数字“0”表示图片样本的编号。

制作矩形定位文件的工具,Tesseract OCR Chopper(http://pp19dd.com/tesseract-ocr-chopper/)。

备份一下这个文件夹。

完成所有的数据分析工作和创建 Tesseract 所需的训练文件,一共有六个步骤。

一个 Python 版的解决方案(https://github.com/REMitchell/tesseract-trainer)。

获取验证码提交答案

常用的处理方法就是,首先把验证码图片下载到硬盘里,清理干净,然后用 Tesseract 处理图片,最后返回符合网站要求的识别结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from urllib.request import urlopen, urlretrieve
from bs4 import BeautifulSoup
import subprocess
import requests
from PIL import Image
from PIL import ImageOps


def cleanImage(imagePath):
image = Image.open(imagePath)
image = image.point(lambda x: 0 if x < 143 else 255)
borderImage = ImageOps.expand(image, border=20, fill='white')
borderImage.save(imagePath)


html = urlopen('http://www.pythonscraping.com/humans-only')
bs = BeautifulSoup(html, 'lxml')
# 收集需要处理的表单数据(包括验证码和输入字段)
imageLocation = bs.find('img', {'title': 'Image CAPTCHA'})['src']
formBuildId = bs.find('input', {'name': 'form_build_id'})['value']
captchaSid = bs.find('input', {'name': 'captcha_sid'})['value']
captchaToken = bs.find('input', {'name': 'captcha_token'})['value']

captchaUrl = 'http://pythonscraping.com'+imageLocation
urlretrieve(captchaUrl, 'captcha.jpg')
cleanImage('captcha.jpg')
p = subprocess.Popen(['tesseract', 'captcha.jpg', 'captcha'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
f = open('captcha.txt', 'r')

# 清理识别结果中的空格和换行符
captchaResponse = f.read().replace(' ', '').replace('\n', '')
print('Captcha solution attempt: '+captchaResponse)
if len(captchaResponse) == 5:
params = {'captcha_token': captchaToken,
'captcha_sid': captchaSid,
'form_id': 'comment_node_page_form',
'form_build_id': formBuildId,
'captcha_response': captchaResponse,
'name': 'Nobody',
'subject': 'Nosubject',
'comment_body[und][0][value]': '中文内容'}
r = requests.post(
'http://www.pythonscraping.com/comment/reply/10', data=params)
responseObj = BeautifulSoup(r.text, 'lxml')
if responseObj.find('div', {'class': 'messages'}) is not None:
print(responseObj.find('div', {'class': 'messages'}).get_text())
else:
print('There was a problem reading the CAPTCHA correctly!')

避开采集陷阱

让网络机器人看起来像人类用户

修改请求头

1
2
3
4
5
6
7
8
9
10
session = requests.Session()
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8"}
url = 'https://www.zhihu.com/api/v4/questions/35441232/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit=50&offset=0&sort_by=default'

req = session.get(url, headers=headers)

with open('d.json', 'w', encoding='utf8') as f:
d = json.loads(req.text)
json.dump(d, f, ensure_ascii=False, indent=4)

处理cookie

1
2
3
4
5
6
7
8
9
10
11
12
from selenium import webdriver

driver = webdriver.Chrome()
driver.get('http://pythonscraping.com')
driver.implicitly_wait(1)
print(driver.get_cookies())


driver.delete_all_cookies()
driver.add_cookie({'name': 'foo', 'value': 'bar', 'path': '/', 'secure': True})
print(driver.get_window_size())
driver.get_screenshot_as_file('foo.png')

避免蜜罐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement

driver = webdriver.Chrome()
driver.get('http://pythonscraping.com/pages/itsatrap.html')
links = driver.find_elements_by_tag_name('a')
for link in links:
if not link.is_displayed():
print('The link '+link.get_attribute('href')+' is a trap')

fields = driver.find_elements_by_tag_name('input')
for field in fields:
if not field.is_displayed():
print('Do not change value of '+field.get_attribute('name'))

问题检查表

如果你已经登录网站却不能保持登录状态,或者网站上出现了其他的“登录状态”异常,请检查你的 cookie。 确认在加载每个页面时 cookie 都被正确调用,而且你的 cookie 在
每次发起请求时都发送到了网站上。

如果你在客户端遇到了 HTTP 错误, 尤其是 403 禁止访问错误,这可能说明网站已经把你的 IP 当作机器人了,不再接受你的任何请求。你要么等待你的 IP 地址从网站黑名单里移除,要么就换个 IP 地址(可以去星巴克上网,或者看看第 14 章的内容)。如果你确定自己并没有被封杀,那么再检查下面的内容:

  • 确认你的爬虫在网站上的速度不是特别快。 快速采集是一种恶习,会对网管的服务器造成沉重的负担,还会让你陷入违法境地, 也是 IP 被网站列入黑名单的首要原因。给你的爬虫增加延迟,让它们在夜深人静的时候运行。切记:匆匆忙忙写程序或收集数据都是拙劣项目管理的表现;应该提前做好计划,避免临阵慌乱
    访问者。如果你不确定请求头的值怎样才算合适,就用你自己浏览器的请求头吧

  • 还有一件必须做的事情: 修改你的请求头!有些网站会封杀任何声称自己是爬虫的访问者。如果你不确定请求头的值怎样才算合适,就用你自己浏览器的请求头吧

  • 确认你没有点击或访问任何人类用户通常不能点击或接入的信息

用爬虫测试网站

Python 单元测试

  • 为每个单元测试的开始和结束提供 setUp 和 tearDown 函数

  • 提供不同类型的“断言”语句让测试成功或失败

  • 把所有以 test 开头的函数当作单元测试运行,忽略不带 test 的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import unittest

class TestAddition(unittest.TestCase):
def setUp(self):
print('Setting up the test')

def tearDown(self):
print('Tearing down the test')

def test_twoPlusTwo(self):
total = 2+2
self.assertEqual(total, 4)

def test_twoPlusOne(self):
total = 2+1
self.assertEqual(total, 4)


if __name__ == '__main__':
unittest.main()

setUp 和 tearDown这两个函数在每个测试方法的开始和结束都会运行一次。

测试维基百科:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from urllib.request import urlopen
from bs4 import BeautifulSoup
import unittest


class TestWikipedia(unittest.TestCase):
bsObj = None

def setUpClass():
print('1')
global bsObj
url = 'http://en.wikipedia.org/wiki/Monty_Python'
bsObj = BeautifulSoup(urlopen(url), 'lxml')
print('2')

def test_titleText(self):
global bsObj
pageTitle = bsObj.find('h1').get_text()
self.assertEqual('Monty Python', pageTitle)

def test_contentExists(self):
global bsObj
content = bsObj.find('div', {'id': 'mw-content-text'})
self.assertIsNotNone(content)


if __name__ == '__main__':
unittest.main()

setUpClass函数只在类的初始化阶段运行一次(与每个测试启动时都运行的 setUp 函数不同)。

Selenium 单元测试

1
2
3
4
5
6
from selenium import webdriver

driver = webdriver.Chrome()
driver.get('http://en.wikipedia.org/wiki/Monty_Python')
assert 'Monty Python' in driver.title
driver.close()

Selenium 单元测试的时候需要比写 Python 单元测试更加随意,断言语句甚至可以整合到生产代码中。

与网站进行交互:

Selenium 也可以对任何给定元素执行很多操作:

1
2
3
4
5
myElement.click()
myElement.click_and_hold()
myElement.release()
myElement.double_click()
myElement.send_keys_to_element("content to enter")

动作链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.keys import Keys
from selenium.webdriver import ActionChains

driver = webdriver.Chrome()
driver.get('http://pythonscraping.com/pages/files/form.html')

firstnameField = driver.find_element_by_name('firstname')
lastnameField = driver.find_element_by_name('lastname')
submitButton = driver.find_element_by_id('submit')

### 方法1 ###
firstnameField.send_keys('Ryan')
lastnameField.send_keys('Mitchell')
submitButton.click()

### 方法2 ###
actions = ActionChains(driver).click(firstnameField).send_keys(
'Ryan').click(lastnameField).send_keys('Mitchell').send_keys(Keys.RETURN) # Keys.RETURN 回车键
actions.perform()

print(driver.find_element_by_tag_name('body').text)
driver.close()
  1. 鼠标拖放动作
1
2
3
4
5
6
7
8
9
10
11
12
13
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver import ActionChains

driver = webdriver.Chrome()
driver.get('http://pythonscraping.com/pages/javascript/draggableDemo.html')
print(driver.find_element_by_id('message').text)

element = driver.find_element_by_id('draggable')
target = driver.find_element_by_id('div2')
actions = ActionChains(driver).drag_and_drop(element, target).perform()

print(driver.find_element_by_id('message').text)
  1. 截屏
1
2
3
4
5
6
7
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver import ActionChains

driver = webdriver.Chrome()
driver.get('http://www.pythonscraping.com/')
driver.get_screenshot_as_file('pythonscraping.png')

Python单元测试与Selenium单元测试的选择

Python 的单元测试语法严谨冗长,更适合为大多数大型项目写测试,而 Selenium 的测试方式灵活且功能强大,可以成为一些网站功能测试的首选。两者组合是最佳拍档。

远程采集

Tor代理服务器

洋葱路由(The Onion Router)网络,常用缩写为 Tor,是一种 IP 地址匿名手段。由网络志愿者服务器构建的洋葱路由器网络, 通过不同服务器构成多个层(就像洋葱)把客户端包在最里面。数据进入网络之前会被加密,因此任何服务器都不能偷取通信数据。另外,虽然每一个服务器的入站和出站通信都可以被查到, 但是要想查出通信的真正起点和终点,必须知道整个通信链路上所有服务器的入站和出站通信细节,而这基本是不可能实现的。

PySocks 是一个非常简单的 Python 代理服务器通信模块,它可以和 Tor 配合使用。

1
2
3
4
5
6
7
import socks
import socket
from urllib.request import urlopen

socks.set_default_proxy(socks.SOCKS5, "localhost", 9150)
socket.socket = socks.socksocket
print(urlopen('http://icanhazip.com').read())

如果你想在 Tor 里面用 Selenium 和 PhantomJS,不需要 PySocks,只需要增加 service_args 参数设置代理端口。

1
2
3
4
5
6
7
from selenium import webdriver

service_args = [ '--proxy=localhost:9150', '--proxy-type=socks5', ]
driver = webdriver.PhantomJS(executable_path='<path to PhantomJS>', service_args=service_args)
driver.get("http://icanhazip.com")
print(driver.page_source)
driver.close()

附录

robots.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Welcome to my robots.txt file!
User-agent: *
Disallow: *
User-agent: Googlebot
Allow: *
Disallow: /private

Twitter 的 robots.txt 文件对 Google 的规则
#Google Search Engine Robot
User-agent: Googlebot
Allow: /?_escaped_fragment_
Allow: /?lang=
Allow: /hashtag/*?src=
Allow: /search?q=%23
Disallow: /search/realtime
Disallow: /search/users
Disallow: /search/*/grid
Disallow: /*?
Disallow: /*/followers
Disallow: /*/following
欢迎打赏