FAIRYFAR-INTERNAL
 
  FAIRYFAR-INTERNAL  |  SITEMAP  |  ABOUT-ME  |  HOME  
Python刷网页抓取需要的信息

以下脚本(Web-Inspector.py)每隔几秒抓取一下网页(某医院专家号),待页面内容满足特定要求时(该专家号开始放号),播放提示音。

snippet.python
#!/usr/bin/python
#coding: utf-8
 
import atexit  
import ctypes  
import ctypes.wintypes
import shutil
import threading
import os
import re
from Tkinter import *
import textwrap
import codecs
import string
from datetime import *
import time
import subprocess
import multiprocessing
import locale
import urllib2
 
#global param
g_APPTitle = "网页巡视员"
g_Timeout = 1.0
g_CodePage = 'cp936'
g_FakePrint = None
g_Oldstderr = None
 
g_thrdWI = None
 
g_rFakePrint = threading.RLock()
class FakePrint:
    def __init__(self):
        self.str = ''
 
    def show(self, str):
        try:
            g_rFakePrint.acquire()
            myapp.txPrint.insert(END, str)
            myapp.txPrint.yview_scroll(1000000, 'units')
            g_rFakePrint.release()
        except:
            return
 
    def clear(self):
        try:
            g_rFakePrint.acquire()
            myapp.txPrint.delete(0.0, END)
            myapp.txPrint.yview_scroll(1000000, 'units')
            g_rFakePrint.release()
        except:
            return
 
    def write(self, *args, **kwargs):
        try:
            g_rFakePrint.acquire()
            myapp.txPrint.insert(END, *args)
            myapp.txPrint.yview_scroll(1000000, 'units')
            g_rFakePrint.release()
        except:
            return
 
class TimerThread(threading.Thread):
    def __init__(self, interval, function, args=[], kwargs={}):
        threading.Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = threading.Event()
 
    def stop(self):
        self.finished.set()
 
    def run(self):
        #Init COM for single thread
        ctypes.oledll.ole32.CoInitialize(None)
        atexit.register(ctypes.oledll.ole32.CoUninitialize)
 
        while True:
            self.finished.wait(self.interval)
            if self.finished.isSet():
                break
            self.function(*self.args, **self.kwargs)
 
 
def GetCurTime(needDate):
    if needDate:
        return time.strftime(u"%Y-%m-%d %H:%M:%S")
    else:
        return time.strftime(u"%H:%M:%S")
 
 
def WITask():
    def _WITaskFun():
        global g_FakePrint
 
        try:
			# 北院 http://www.eztcn.com/Home/Disease/docTable/hosid/50/depid/1099/docid/528.html
			# 南院 http://www.eztcn.com/Home/Disease/docTable/hosid/71/depid/1325/docid/638.html
            f = urllib2.urlopen("https://www.eztcn.com/Home/Disease/docTable/hosid/506/depid/44150/docid/10205.html", data=None, timeout=8)
            web_str = f.read()
            f.close()
            web_len = len(web_str)
            a1 = web_str.find(' 2017-12-09 ', 0, web_len)
            if a1 > -1:
                #print a1
                a2 = web_str.find('<span class=\"yy_date_week\">预约</span>', a1, web_len)
                if a2 > -1:
                    #print a2
                    a3 = web_str.find(' 2017-12-12 ', a2, web_len)
                    if a3 > -1:
                        #print a3
                        g_FakePrint.show("[" + GetCurTime(True) + u"] Found!\r\n")
                        os.system("found.mp3")
                        return _WITaskFun
        except:
            g_FakePrint.show("[" + GetCurTime(True) + u"] Except!\r\n")
            return _WITaskFun
        g_FakePrint.show("[" + GetCurTime(True) + u"] Not found!\r\n")
 
    return _WITaskFun
 
 
class MyApp:
    def __init__(self, parent):
        global g_APPTitle
        global g_FakePrint
        global g_Oldstderr
 
        g_FakePrint = FakePrint()
        g_Oldstderr = sys.stderr
        sys.stderr = g_FakePrint
 
        parent.title(g_APPTitle)
        self.myParent = parent
        self.frm = Frame(parent)
        self.frm.pack()
 
        self.btnExit = Button(self.frm, command=self.btnExitClick )
        self.btnExit.configure(text="Exit", background="green")
        self.btnExit.pack(side=TOP)
 
        self.txScrollV = Scrollbar(self.frm, orient=VERTICAL)
        self.txScrollH = Scrollbar(self.frm, orient=HORIZONTAL)
 
        self.txPrint = Text(self.frm, width=100, height=30, wrap='none', \
                            foreground="white", background="black", insertbackground="green",insertwidth=5, \
                            yscrollcommand=self.txScrollV.set, xscrollcommand=self.txScrollH.set)
        self.txScrollV.config(command=self.txPrint.yview)
        self.txScrollH.config(command=self.txPrint.xview)
        self.txScrollV.pack(fill="y", expand=0, side=RIGHT, anchor=N)
        self.txScrollH.pack(fill="x", expand=0, side=BOTTOM, anchor=N)
        self.txPrint.pack(side=BOTTOM)
        self.txPrint.focus_force()
 
        #Center the window.
        parent.update()
        curWidth = 750
        curHeight = 450
        scrWidth,scrHeight = parent.maxsize()
        tmpCfg = '%dx%d+%d+%d' % (curWidth,curHeight,(scrWidth-curWidth)/2,(scrHeight-curHeight)/2)
        parent.geometry(tmpCfg)
 
    def exitapp(self):
        global g_thrdWI
        global g_Oldstderr
        if g_thrdWI != None:
            g_thrdWI.stop()
            g_thrdWI = None
        if g_Oldstderr != None:
            sys.stderr = g_Oldstderr
 
    def btnExitClick(self):
        self.exitapp()
        self.myParent.destroy()
 
 
if __name__ == '__main__':
    multiprocessing.freeze_support()
 
    #Command code page
    g_CodePage = locale.getpreferredencoding()
 
    root = Tk()
    myapp = MyApp(root)
 
    bContinue = True
    #bContinue = ReadConfig()
 
    if bContinue:
        root.title(g_APPTitle)
        g_thrdWI = TimerThread(10, WITask())
        g_thrdWI.start()
 
        g_FakePrint.show("[" + GetCurTime(True) + u"] start.\r\n")
        root.mainloop()
        myapp.exitapp()

运行界面:

img



打赏作者以资鼓励:
移动端扫码阅读: