Python刷网页抓取需要的信息
以下脚本(Web-Inspector.py)每隔几秒抓取一下网页(某医院专家号),待页面内容满足特定要求时(该专家号开始放号),播放提示音。
- snippet.python
#!/usr/bin/python #coding: utf-8 import atexit import ctypes import ctypes.wintypes import shutil import threading import os import re from Tkinter import * import textwrap import codecs import string from datetime import * import time import subprocess import multiprocessing import locale import urllib2 #global param g_APPTitle = "网页巡视员" g_Timeout = 1.0 g_CodePage = 'cp936' g_FakePrint = None g_Oldstderr = None g_thrdWI = None g_rFakePrint = threading.RLock() class FakePrint: def __init__(self): self.str = '' def show(self, str): try: g_rFakePrint.acquire() myapp.txPrint.insert(END, str) myapp.txPrint.yview_scroll(1000000, 'units') g_rFakePrint.release() except: return def clear(self): try: g_rFakePrint.acquire() myapp.txPrint.delete(0.0, END) myapp.txPrint.yview_scroll(1000000, 'units') g_rFakePrint.release() except: return def write(self, *args, **kwargs): try: g_rFakePrint.acquire() myapp.txPrint.insert(END, *args) myapp.txPrint.yview_scroll(1000000, 'units') g_rFakePrint.release() except: return class TimerThread(threading.Thread): def __init__(self, interval, function, args=[], kwargs={}): threading.Thread.__init__(self) self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.finished = threading.Event() def stop(self): self.finished.set() def run(self): #Init COM for single thread ctypes.oledll.ole32.CoInitialize(None) atexit.register(ctypes.oledll.ole32.CoUninitialize) while True: self.finished.wait(self.interval) if self.finished.isSet(): break self.function(*self.args, **self.kwargs) def GetCurTime(needDate): if needDate: return time.strftime(u"%Y-%m-%d %H:%M:%S") else: return time.strftime(u"%H:%M:%S") def WITask(): def _WITaskFun(): global g_FakePrint try: # 北院 http://www.eztcn.com/Home/Disease/docTable/hosid/50/depid/1099/docid/528.html # 南院 http://www.eztcn.com/Home/Disease/docTable/hosid/71/depid/1325/docid/638.html f = urllib2.urlopen("https://www.eztcn.com/Home/Disease/docTable/hosid/506/depid/44150/docid/10205.html", data=None, timeout=8) web_str = f.read() f.close() web_len = len(web_str) a1 = web_str.find(' 2017-12-09 ', 0, web_len) if a1 > -1: #print a1 a2 = web_str.find('<span class=\"yy_date_week\">预约</span>', a1, web_len) if a2 > -1: #print a2 a3 = web_str.find(' 2017-12-12 ', a2, web_len) if a3 > -1: #print a3 g_FakePrint.show("[" + GetCurTime(True) + u"] Found!\r\n") os.system("found.mp3") return _WITaskFun except: g_FakePrint.show("[" + GetCurTime(True) + u"] Except!\r\n") return _WITaskFun g_FakePrint.show("[" + GetCurTime(True) + u"] Not found!\r\n") return _WITaskFun class MyApp: def __init__(self, parent): global g_APPTitle global g_FakePrint global g_Oldstderr g_FakePrint = FakePrint() g_Oldstderr = sys.stderr sys.stderr = g_FakePrint parent.title(g_APPTitle) self.myParent = parent self.frm = Frame(parent) self.frm.pack() self.btnExit = Button(self.frm, command=self.btnExitClick ) self.btnExit.configure(text="Exit", background="green") self.btnExit.pack(side=TOP) self.txScrollV = Scrollbar(self.frm, orient=VERTICAL) self.txScrollH = Scrollbar(self.frm, orient=HORIZONTAL) self.txPrint = Text(self.frm, width=100, height=30, wrap='none', \ foreground="white", background="black", insertbackground="green",insertwidth=5, \ yscrollcommand=self.txScrollV.set, xscrollcommand=self.txScrollH.set) self.txScrollV.config(command=self.txPrint.yview) self.txScrollH.config(command=self.txPrint.xview) self.txScrollV.pack(fill="y", expand=0, side=RIGHT, anchor=N) self.txScrollH.pack(fill="x", expand=0, side=BOTTOM, anchor=N) self.txPrint.pack(side=BOTTOM) self.txPrint.focus_force() #Center the window. parent.update() curWidth = 750 curHeight = 450 scrWidth,scrHeight = parent.maxsize() tmpCfg = '%dx%d+%d+%d' % (curWidth,curHeight,(scrWidth-curWidth)/2,(scrHeight-curHeight)/2) parent.geometry(tmpCfg) def exitapp(self): global g_thrdWI global g_Oldstderr if g_thrdWI != None: g_thrdWI.stop() g_thrdWI = None if g_Oldstderr != None: sys.stderr = g_Oldstderr def btnExitClick(self): self.exitapp() self.myParent.destroy() if __name__ == '__main__': multiprocessing.freeze_support() #Command code page g_CodePage = locale.getpreferredencoding() root = Tk() myapp = MyApp(root) bContinue = True #bContinue = ReadConfig() if bContinue: root.title(g_APPTitle) g_thrdWI = TimerThread(10, WITask()) g_thrdWI.start() g_FakePrint.show("[" + GetCurTime(True) + u"] start.\r\n") root.mainloop() myapp.exitapp()
运行界面:
打赏作者以资鼓励: