记一次使用 Python 爬取B站up主信息经历

需求

尽可能获取 B 站美食博主列表 按照粉丝数排序 最终目的是能找出粉丝数前列 up 主的 id 名

思路

  • 将几个包含很多美食区视频的网页的 url 存起来
  • 通过请求 url 拿到尽可能多的投稿视频的链接
  • 通过视频链接拿到 up 主的主页链接 并分析出 up 主的 uid
  • 通过 uid 请求信息接口 拿到 up 主的 id 以及粉丝数

用到的库及其作用

csv : 读写 csv 文件
josn : 读写 josn 文件
time : 进行延时操作
random : 随机数功能实现
codecs : 打开文件 (解决中文写入文件后乱码问题)
requests : 请求库
selenium : 模拟浏览器
UserAgent : 随机 UA 库
threading : 多线程库
BeautifulSoup : 解析 HTML 与 XML 的库

遇到的问题以及解决方案

拿不到 html 结构

可能是因为没有设置 UA 服务器无法确认请求是否由用户发出

提前设置好请求头 UA(浏览器标识), 伪装成浏览器, 模拟用户正常请求, 防止 ip 被 ban 如果已经被 ban 那就算啦

这里可以自己设置 UA 为指定浏览器标识 也可以用库来随机生成

这里建议使用 fake_useragent 库来随机生成 UA 标识

安装 fake_useragent:

1
pip insatll fake_useragent

使用:

1
2
3
from fake_useragent import UserAgent
UA = UserAgent().random
headers = {'User-Agent': UA}

拿到的 html 结构不完整

可能是因为网页动态加载的(拿到网页并用浏览器解析之后还需要 js 来动态更新网页内容), 导致拿到的网页的 html 结构残缺, 拿不到想要的数据

可以使用 Selenium 库来模拟浏览器爬虫 这样他会等到网页加载完全再拿出网页的 html

安装 Selenium:

1
pip install selenium

下载你使用的对应版本的浏览器驱动:

使用:

1
2
3
4
5
6
7
8
9
10
11
12
# driver = webdriver.Firefox()   # Firefox浏览器
# driver = webdriver.Chrome() # Chrome浏览器
# driver = webdriver.Ie() # Internet Explorer浏览器
# driver = webdriver.Edge() # Edge浏览器
# driver = webdriver.Opera() # Opera浏览器
# driver = webdriver.PhantomJS() # PhantomJS

# 这里使用对应的浏览器标识创建实例
browser = webdriver.Chrome()
browser.get(web)
# html 即为拿到的网页html结构字符串
html = browser.page_source

无法永久保存且不会乱码的数据

我太菜了 不会使数据库 所以只能存本地了 为了数据后续处理的方便 这里使用了 csv 文件格式存数据

同时使用了 codecs 打开文件来防止乱码

csv 格式文件很简单 Excel 也能打开并处理

Python 里有 csv 库来读写 csv 格式文件

安装 csvcodecs 库:

1
pip install csv codecs

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
filename = "data.csv"
# ab 为追加 wb 为覆盖 更详细的内容自行搜索
csv_file = codecs.open(filename, 'ab', "gbk")
# 打开实例
csv_writer = csv.writer(csv_file)
# 写入表头
csv_writer.writerow(("第一项","第二项"))
info = (1,2)
infos = [(1, 2), (3, 4)]
# 写入一行数据
csv_writer.writerow(info)
# 写入多行数据
csv_writer.writerow(infos)

返回的数据是 json 格式

使用 json 库处理

安装 json 库:

1
pip install json

使用:

1
2
3
4
5
6
7
8
import json
jsonData = '{"name": "Viki"}'
# json字符串转为Python的字典(或者其他数据)
data = json.load(jsonData)
print(data['name']) # Viki
# Python的字典(或者其他数据)转为json字符串
jsonStr = json.dumps(data)
print(jsonStr) '{"name": "Viki"}'

爬取速度太慢

使用 threading 库实现多线程

安装 threading:

1
pip install threading

使用:

1
2
3
4
5
6
lst = [123, 456, 789]
def double(n):
return n*n
for n in lst:
double_thread = threading.Thread(target=double, args=(n))
double_thread.start()

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import csv
import json
import time
import random
import codecs
import urllib
import requests
import threading
from bs4 import BeautifulSoup
from selenium import webdriver
from fake_useragent import UserAgent

biliFoodUrls = [
'https://www.bilibili.com/v/food',
'https://www.bilibili.com/v/food/make',
'https://www.bilibili.com/v/food/rural',
'https://www.bilibili.com/v/food/record',
'https://www.bilibili.com/v/food/detective',
'https://www.bilibili.com/v/food/measurement',
'https://www.bilibili.com/v/popular/rank/food'
]


def grabVideoUrl(url, videoUrls, upUrls, upUids, upInfos, browser):
if not url:
return
if url[0] == '/':
url = 'https:'+url
if 'bilibili.com/video' in url:
videoUrls.append(url)
print(f'捕获到视频url:\t{url}')
elif 'space.bilibili.com' in url:
upUrls.append(url)
print(f'捕获到up主url:\t{url}')


def getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser):
for web in biliFoodUrls:
print(f"开始处理页面:\t{web}")
browser.get(web)
html = browser.page_source
webBf = BeautifulSoup(html, 'html.parser')
al = webBf.find_all("a")
for a in al:
url = a.get('href')
args = (url, videoUrls, upUrls, upUids, upInfos, browser)
video_thread = threading.Thread(target=grabVideoUrl, args=args)
video_thread.start()


def grabUpsUrl(videoUrl, videoUrls, upUrls, upUids, upInfos, browser):
print(f"开始处理视频url:\t{videoUrl}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
html = requests.request('GET', videoUrl, headers=headers).content
webBf = BeautifulSoup(html, "html.parser")
upDiv = webBf.find(id='v_upinfo')
if not upDiv:
print(f"获取视频up主信息遭到拦截\t{videoUrl}")
return
upUrl = upDiv.find_all('a')[0].get('href')
if upUrl[0] == '/':
upUrl = 'https:' + upUrl
upUrls.append(upUrl)
print(f'捕获到up主url:\t{upUrl}')


def getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser):
for videoUrl in videoUrls:
args = (videoUrl, videoUrls, upUrls, upUids, upInfos, browser)
upUrl_thread = threading.Thread(target=grabUpsUrl, args=args)
upUrl_thread.start()


def getUpsId(videoUrls, upUrls, upUids, upInfos, browser):
for upUrl in upUrls:
if not upUrl:
continue
upUid = upUrl.split("#")[0].split('/')[-1]
upUids.append(upUid)
print(f'捕获到up主id:\t{upUid}')


def getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser):
api = 'https://api.bilibili.com/x/web-interface/card?jsonp=jsonp&mid='
ids = set(upUids)
upUids = list(ids)

for index, upUid in enumerate(upUids):

print(f"开始处理mid:{upUid}\t进度:{index+1}/{len(upUids)}")

headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}

res = requests.request('GET', f"{api}{upUid}", headers=headers)
data = res.json()['data']
n = 0

while((not data) and n < 100):
print(f"请求遭到服务器拦截 重试第{n+1}次中...\t总进度:{index+1}/{len(upUids)}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
res = requests.request('GET', f"{api}{upUid}", headers=headers)
n += 1
data = res.json()['data']

if n == 100:
continue

print(f"读取up主信息(mid={upUid})成功")

# if not data:
# print(f"读取up主信息(mid={upUid})的请求遭到服务器拦截")
# continue

upName = data['card']['name']
follower = data['follower']
mid = data['card']['mid']
upInfos.append({'id': upName, 'follower': follower, 'mid': mid})

upInfos = sorted(upInfos, key=lambda upInfo: upInfo['follower'])

print("写入Excel文件中...")
filename = "bili.csv"
bili_csv = codecs.open(filename, 'ab', "gbk")
bili_csv_writer = csv.writer(bili_csv)

for upInfo in upInfos:
msg = f"写入中\tUp主id:{upInfo['id']}\t粉丝数:{upInfo['follower']}\tmid={upInfo['mid']}"
print(msg)

info = (upInfo['id'], upInfo['follower'], upInfo['mid'])
try:
bili_csv_writer.writerow(info)
except:
continue

bili_csv.close()

print(f"写入到{filename}完成 本次共爬取到{len(upInfos)}个up主信息")


def main(browser):
videoUrls = []
upUrls = []
upUids = []
upInfos = []
getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser)
getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser)
time.sleep(20)
getUpsId(videoUrls, upUrls, upUids, upInfos, browser)
getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser)


if __name__ == '__main__':
browser = webdriver.Chrome()
for i in range(100):
print(f"开始第{i+1}次爬取...")
main(browser)
browser.close()

参考

Selenium Python 教程 - 知乎