沒耐心看文章的請后直接:
git clone https://github.com/derekhe/mobike-crawler python3 crawler.py
爽了以后請別忘了給個star和!
analysis - jupyter做數(shù)據(jù)分析
influx-importer - 導(dǎo)入到influxdb,但之前沒怎么弄好
modules - 代理模塊
web - 實時圖形化顯示模塊,當(dāng)時只是為了學(xué)一下react而已,效果請見這里
crawler.py - 爬蟲核心代碼
importToDb.py - 導(dǎo)入到postgres數(shù)據(jù)庫中進(jìn)行分析
sql.sql - 創(chuàng)建表的sql
start.sh - 持續(xù)運行的腳本
核心代碼放在crawler.py中,數(shù)據(jù)首先存儲在sqlite3數(shù)據(jù)庫中,然后去重復(fù)后導(dǎo)出到csv文件中以節(jié)約空間。
摩拜單車的API返回的是一個正方形區(qū)域中的單車,我只要按照一塊一塊的區(qū)域移動就能抓取到整個大區(qū)域的數(shù)據(jù)。
left,top,right,bottom定義了抓取的范圍,目前是成都市繞城高速之內(nèi)以及南至南湖的正方形區(qū)域。offset定義了抓取的間隔,現(xiàn)在以0.002為基準(zhǔn),在DigitalOcean 5$的服務(wù)器上能夠15分鐘內(nèi)抓取一次。
def start(self): left = 30.7828453209 top = 103.9213455517 right = 30.4781772402 bottom = 104.2178123382 offset = 0.002 if os.path.isfile(self.db_name): os.remove(self.db_name) try: with sqlite3.connect(self.db_name) as c: c.execute('''CREATE TABLE mobike (Time DATETIME, bikeIds VARCHAR(12), bikeType TINYINT,distId INTEGER,distNum TINYINT, type TINYINT, x DOUBLE, y DOUBLE)''') except Exception as ex: pass
然后就啟動了250個線程,至于你要問我為什么沒有用協(xié)程,哼哼~~我當(dāng)時沒學(xué)~~~其實是可以的,說不定效率更高。
由于抓取后需要對數(shù)據(jù)進(jìn)行去重,以便消除小正方形區(qū)域之間重復(fù)的部分,最后的group_data正是做這個事情。
executor = ThreadPoolExecutor(max_workers=250) print("Start") self.total = 0 lat_range = np.arange(left, right, -offset) for lat in lat_range: lon_range = np.arange(top, bottom, offset) for lon in lon_range: self.total += 1 executor.submit(self.get_nearby_bikes, (lat, lon)) executor.shutdown() self.group_data()
最核心的API代碼在這里。小程序的API接口,搞幾個變量就可以了,十分簡單。
def get_nearby_bikes(self, args): try: url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do" payload = "latitude=%s&longitude=%s&errMsg=getMapCenterLocation" % (args[0], args[1]) headers = { 'charset': "utf-8", 'platform': "4", "referer":"https://servicewechat.com/wx40f112341ae33edb/1/", 'content-type': "application/x-www-form-urlencoded", 'user-agent': "MicroMessenger/6.5.4.1000 NetType/WIFI Language/zh_CN", 'host': "mwx.mobike.com", 'connection': "Keep-Alive", 'accept-encoding': "gzip", 'cache-control': "no-cache" } self.request(headers, payload, args, url) except Exception as ex: print(ex)
最后你可能要問頻繁的抓取IP沒有被封么?其實摩拜單車是有IP的訪問速度限制的,只不過破解之道非常簡單,就是用大量的代理。
我是有一個代理池,每天基本上有8000以上的代理。在ProxyProvider中直接獲取到這個代理池然后提供一個pick函數(shù)用于隨機(jī)選取得分前50的代理。請注意,我的代理池是每小時更新的,但是代碼中提供的jsonblob的代理列表僅僅是一個樣例,過段時間后應(yīng)該大部分都作廢了。
在這里用到一個代理得分的機(jī)制。我并不是直接隨機(jī)選擇代理,而是將代理按照得分高低進(jìn)行排序。每一次成功的請求將加分,而出錯的請求將減分。這樣一會兒就能選出速度、質(zhì)量最佳的代理。如果有需要還可以存下來下次繼續(xù)用。
class ProxyProvider: def init(self, min_proxies=200): self._bad_proxies = {} self._minProxies = min_proxies self.lock = threading.RLock() self.get_list() def get_list(self): logger.debug("Getting proxy list") r = requests.get("https://jsonblob.com/31bf2dc8-00e6-11e7-a0ba-e39b7fdbe78b", timeout=10) proxies = ujson.decode(r.text) logger.debug("Got %s proxies", len(proxies)) self._proxies = list(map(lambda p: Proxy(p), proxies)) def pick(self): with self.lock: self._proxies.sort(key = lambda p: p.score, reverse=True) proxy_len = len(self._proxies) max_range = 50 if proxy_len > 50 else proxy_len proxy = self._proxies[random.randrange(1, max_range)] proxy.used() return proxy
在實際使用中,通過proxyProvider.pick()選擇代理,然后使用。如果代理出現(xiàn)任何問題,則直接用proxy.fatal_error()降低評分,這樣后續(xù)就不會選擇到這個代理了。
def request(self, headers, payload, args, url): while True: proxy = self.proxyProvider.pick() try: response = requests.request( "POST", url, data=payload, headers=headers, proxies={"https": proxy.url}, timeout=5,verify=False ) with self.lock: with sqlite3.connect(self.db_name) as c: try: print(response.text) decoded = ujson.decode(response.text)['object'] self.done += 1 for x in decoded: c.execute("INSERT INTO mobike VALUES (%d,'%s',%d,%d,%s,%s,%f,%f)" % ( int(time.time()) * 1000, x['bikeIds'], int(x['biketype']), int(x['distId']), x['distNum'], x['type'], x['distX'], x['distY'])) timespend = datetime.datetime.now() - self.start_time percent = self.done / self.total total = timespend / percent print(args, self.done, percent * 100, self.done / timespend.total_seconds() * 60, total, total - timespend) except Exception as ex: print(ex) break except Exception as ex: proxy.fatal_error()
好了,基本上就到此了~~~其他的代碼自己研究吧~~~
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com