Deprecated: Creation of dynamic property Typecho\Widget\Request::$feed is deprecated in /www/wwwroot/blog.iletter.top/var/Widget/Archive.php on line 253
白荼日记 - 爬虫 2025-06-28T17:21:27+08:00 Typecho https://blog.iletter.top/index.php/feed/atom/tag/%E7%88%AC%E8%99%AB/ <![CDATA[河北住建厅公告提取油猴脚本]]> https://blog.iletter.top/index.php/archives/361.html 2025-06-28T17:21:27+08:00 2025-06-28T17:21:27+08:00 DelLevin https://blog.iletter.top // ==UserScript== // @name 河北住建厅公告提取 // @version 1.0 // @description 点击按钮提取河北住建厅信息,根据页面 URL 自动判断逻辑 // @author YourName // @match https://zfcxjst.hebei.gov.cn/* // @require https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js // @grant GM_xmlhttpRequest // ==/UserScript== (function () { 'use strict'; // 创建按钮 const button = document.createElement('button'); button.innerText = '提取公告'; button.style.position = 'fixed'; button.style.top = '10px'; button.style.right = '10px'; button.style.zIndex = '99999'; button.style.padding = '8px 12px'; button.style.backgroundColor = '#007bff'; button.style.color = 'white'; button.style.border = 'none'; button.style.borderRadius = '4px'; button.style.cursor = 'pointer'; button.style.fontSize = '14px'; button.style.boxShadow = '0 2px 4px rgba(0,0,0,0.2)'; document.body.appendChild(button); // 函数:提取河北住建厅公告数据 function extractHBZJTData_GongGaoGongShi(types) { console.log("页面标题:", document.title); const result = []; const contentDiv = document.querySelector('div.pson_listWenhao'); if (!contentDiv) { console.log("未找到目标 div 元素"); return; } contentDiv.querySelectorAll('li').forEach(li => { const aTag = li.querySelector('a'); const dateSpan = li.querySelector('span.date'); const wenhaoSpan = li.querySelector('span.wenhao'); if (!aTag || !dateSpan) return; let href = aTag.getAttribute('href'); let title = aTag.textContent.trim(); let wenhao = ''; if (wenhaoSpan) { wenhao = wenhaoSpan.textContent.trim(); } let fullUrl = href; if (href.startsWith('/hbzjt')) { fullUrl = new URL(href, 'https://zfcxjst.hebei.gov.cn').href; } result.push({ title: wenhao ? `[${wenhao}] ${title}` : title, url: fullUrl, date: dateSpan.textContent.trim(), type:types }); }); // result.forEach(item => { // console.log(`${item.date}|${item.type} | ${item.title} -> ${item.url}`); // }); // 发送数据到 API sendToAPI(result); } function sendToAPI(dataArray) { GM_xmlhttpRequest({ method: 'POST', url: 'http://192.168.196.81:8081/sys_api/api/buildingspider/batch', data: JSON.stringify(dataArray), headers: { 'Content-Type': 'application/json' }, onload: function(response) { console.log('数据发送成功:', response.responseText); alert('数据已成功发送到服务器!'); }, onerror: function(error) { console.error('数据发送失败:', error); alert('数据发送失败,请检查网络或服务器状态!'); } }); } // 函数:提取河北住建厅公告数据 function extractHBZJTData_XinWenZiXun(types) { console.log("页面标题:", document.title); const result = []; const contentDiv = document.querySelector('div.pson_list'); if (!contentDiv) { console.log("未找到目标 div 元素"); return; } contentDiv.querySelectorAll('li').forEach(li => { const aTag = li.querySelector('a'); const dateSpan = li.querySelector('span.date'); if (!aTag || !dateSpan) return; let href = aTag.getAttribute('href'); let title = aTag.textContent.trim(); let fullUrl = href; if (href.startsWith('/hbzjt')) { fullUrl = new URL(href, 'https://zfcxjst.hebei.gov.cn').href; } result.push({ title: title, url: fullUrl, date: dateSpan.textContent.trim(), type:types }); }); // result.forEach(item => { // console.log(`${item.date}|${item.type} | ${item.title} -> ${item.url}`); // }); // 发送数据到 API sendToAPI(result); } // 备用函数(可根据需要自定义) function fallbackFunction() { console.log("不爬虫,页面标题:", document.title); } // 按钮点击事件 button.addEventListener('click', () => { const currentUrl = window.location.href; if ( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/zcwj/gggs/') ){ extractHBZJTData_GongGaoGongShi("河北省住房和城乡建设厅,公告公示"); }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/zcwj/tfwj/') ){ extractHBZJTData_GongGaoGongShi("河北省住房和城乡建设厅,厅发文件"); }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/zcwj/gfxwj/') ){ extractHBZJTData_GongGaoGongShi("河北省住房和城乡建设厅,厅发规范性文件"); }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/xwzx/szyw/') ) { extractHBZJTData_XinWenZiXun("河北省住房和城乡建设厅,时政要闻") }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/xwzx/jsyw/') ) { extractHBZJTData_XinWenZiXun("河北省住房和城乡建设厅,建设要闻") } else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/xwzx/sxdt/') ) { extractHBZJTData_XinWenZiXun("河北省住房和城乡建设厅,市县动态") }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/xwzx/mtgz/') ) { extractHBZJTData_GongGaoGongShi("河北省住房和城乡建设厅,媒体关注") }else if( currentUrl.includes('https://zfcxjst.hebei.gov.cn/hbzjt/xwzx/zcjd/') ) { extractHBZJTData_XinWenZiXun("河北省住房和城乡建设厅,政策解读") } else { fallbackFunction(); } }); })(); ]]> <![CDATA[科技创新型企业爬虫与提醒导入]]> https://blog.iletter.top/index.php/archives/360.html 2025-06-25T09:04:00+08:00 2025-06-25T09:04:00+08:00 DelLevin https://blog.iletter.top 1.每日爬取数据后,将爬取的公告通知,同步到数据库

2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)

取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据

检索是否有科技创新企业的公告/通知

# 检查当日数据是否有科创企业名录

import re
import time
import pymysql
import requests
from gxt_spider import get_industry
from kjt_spider import get_sci_kjt
from sdszf_spider import get_sci_sdszf
from jinja2 import Template
import json

 def connect_to_database():
    connection = pymysql.connect(
         host='127.0.0.1',
         user='root',
         password='123456',
        database='my_database_test',
        charset='utf8mb4',
         cursorclass=pymysql.cursors.DictCursor
     )
     return connection


def query_today_kc_enterprises():
    keywords = [
        "科技型中小企业",
        "高新技术企业",
        "众创空间",
        "科技领军企业",
        "技术先进型服务企业",
        "技术创新示范企业",
        "专精特新",
        "科技企业",
        "瞪羚",
        "独角兽",
        "科技小巨人企业",
        '小巨人']
    not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"]
    sql = build_sql_query(keywords, not_contain_keywords)

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            cursor.execute(sql)
            results = cursor.fetchall()

            return {
                "total": len(results),
                "list": results
            }
    finally:
        connection.close()

def build_sql_query(keywords, not_contain_keywords):
    like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords])
    not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords])
    sql = f"""
        SELECT 
        CASE type
            WHEN '1' THEN '山东省科学技术厅'
            WHEN '2' THEN '山东省工业和技术化厅'
            WHEN '3' THEN '山东省人民政府'
            ELSE '未知类型'
        END AS type_name,date,title,url FROM `sci_spider`
        WHERE ({like_conditions}) 
        AND ({not_like_conditions})
        AND DATE(create_date) = DATE(NOW())
    """
    return sql


def mail_sender(content):
    import smtplib
    from email.mime.text import MIMEText
    from email.header import Header
    # 第三方 SMTP 服务
    mail_host = "smtp.163.com"  # 设置服务器
    mail_user = "18631839859@163.com"  # 用户名
    mail_pass = "GENGs7dM45TJDH6y"  # 口令
    sender = '18631839859@163.com'
    receivers = ['wonder1999@126.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱

    # message = MIMEText(content, 'plain', 'utf-8')
    message = MIMEText(content, 'html', 'utf-8')
    message['From'] = Header("科技型中小企业通知", 'utf-8')
    message['To'] = Header("科技型中小企业", 'utf-8')

    subject = '科技型中小企业通知'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)  # 25 为 SMTP 端口号
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, message.as_string())
        print("邮件发送成功")
    except smtplib.SMTPException:
        print("Error: 无法发送邮件")


def wx_web_hook(data):
    """
    通过企业微信Webhook发送Markdown格式的消息
    :param data: 包含通知数据的字典,结构应包含'total'和'list'键
    :return: None
    """
    # Webhook地址(请替换为你的实际Key)
    webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e"

    # 构造Markdown内容
    content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n"
    for row in data['list']:
        content += (
            f"- [{row['title']}]({row['url']}) "
            f"<font color=\"comment\">{row['date']}</font> "
            f"<font color=\"warning\">{row['type_name']}</font>\n"
        )

    # 构建请求体
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "content": content
        }
    }
    # 发送请求并处理响应
    try:
        response = requests.post(webhook_url, json=payload)
        response.raise_for_status()  # 抛出HTTP错误
        result = response.json()

        if result.get("errcode") == 0:
            print("✅ 消息发送成功")
        else:
            print(f"❌ 消息发送失败: {result.get('errmsg')}")

    except requests.exceptions.RequestException as e:
        print(f"⚠️ 请求异常: {e}")

if __name__ == '__main__':
    get_industry(1, 2)
    get_sci_kjt(1, 1)
    get_sci_sdszf(1, 3)
    data = query_today_kc_enterprises()
    title = f"找到 {data['total']} 条疑似符合条件的记录:"
    for row in data['list']:
        print(row)

    if data['total'] > 0:
        wx_web_hook(data)
        # mail_sender('测试消息')

工信厅爬虫

import re
import time
import pymysql
import requests


# 数据库链接
def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_industry(page_num, type):
    url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}')

    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    da = re.findall(r'<div class="bottom">            <span>                (.*?)            </span>', response)
    in_url = re.findall(r'target="_blank" href="(.*?)">', response)
    content = re.findall(r'<a title="(.*?)" target="_blank"', response)

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + in_url[i])

    if len(da)*2 != len(in_url) or len(da)*2 != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
               INSERT INTO `my_database_test`.`sci_spider` 
               (`title`, `url`, `date`, `type`, `create_date`) 
               VALUES (%s, %s, %s, %s, NOW())
               """
            count = 0
            for i in range(len(da)):
                if da[i][0:10] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()

if __name__ == '__main__':
    get_industry(1, 2)

科技厅爬虫

import re
import time
import pymysql
import requests


def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_sci_kjt(page_num, type):
    url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}')
    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    da = re.findall(r'<span class="pull-right">(.*?)</span>', response)
    sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response)
    content = re.findall(r'<s></s>(.*?)</a></li>', response)

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

    if len(da) != len(sci_url) or len(da) != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
            INSERT INTO `my_database_test`.`sci_spider` 
            (`title`, `url`, `date`, `type`, `create_date`) 
            VALUES (%s, %s, %s, %s, NOW())
            """
            count = 0
            for i in range(len(da)):
                if da[i] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], sci_url[i], da[i], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()


if __name__ == '__main__':
    get_sci_kjt(1, 1)

山东省人民政府爬虫

import re
import time
import pymysql
import requests


def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_sci_sdszf(page_num, type):
    url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}')
    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    # 提取日期
    da = re.findall(r'<span>\s*(\d{4}-\d{2}-\d{2})\s*</span>', response)
    # 提取链接
    sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response)
    # 提取标题(title 属性)
    content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response)
    # return
    print(len(da), len(sci_url), len(content))

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

    if len(da) != len(sci_url) or len(da) != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
            INSERT INTO `my_database_test`.`sci_spider` 
            (`title`, `url`, `date`, `type`, `create_date`) 
            VALUES (%s, %s, %s, %s, NOW())
            """
            count = 0
            for i in range(len(da)):
                if da[i] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], sci_url[i], da[i], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()


if __name__ == '__main__':
    get_sci_sdszf(1, 3)
]]>
<![CDATA[python编写接口服务]]> https://blog.iletter.top/index.php/archives/359.html 2025-04-18T14:24:00+08:00 2025-04-18T14:24:00+08:00 DelLevin https://blog.iletter.top 使用Python实现一个简单的接口服务,可以通过get、post方法请求该接口,拿到响应数据。创建一个api\_server.py文件,添加代码如下:

import flask
from flask import request, jsonify
from rescode.constants import ResponseCode
from spider.qiXinSpider import getQiXinCompInfo
'''
flask: web框架,通过flask提供的装饰器@server.route()将普通函数转换为服务
登录接口,需要传url、username、passwd
'''
# 创建一个服务,把当前这个python文件当做一个服务
server = flask.Flask(__name__)
# server.config['JSON_AS_ASCII'] = False
# @server.route()可以将普通函数转变为服务 登录接口的路径、请求方式
@server.route('/python-api/getCompTageFromQiXin', methods=['get', 'post'])
def getCompTageFromQiXin():
    try:
        # 获取通过url请求传参的数据
        httpUrl = request.values.get('httpUrl')
        if not httpUrl:
            return jsonify(ResponseCode.PARAM_REQUIRED), 400

        if 'www.sdxyjq.com:8080' in httpUrl:
            httpUrl = httpUrl.replace('www.sdxyjq.com:8080', 'www.sdxyjq.com')
        #  调用qiXinSpider.py里面的函数,需要传入
        #  chrome的路径  D:\\APP\\TestChrome2\\Application\\chrome.exe
        #  信用金桥的http链接的url地址
        comp_info = getQiXinCompInfo(httpUrl,'D:\\APP\\TestChrome2\\Application\\chrome.exe')
        data = {
            "httpUrl" : httpUrl,
            "qiXinSpider" : comp_info,
            "compName":comp_info['baseInfo']['ename']
        }
        return jsonify({**ResponseCode.SUCCESS, "data": data}), 200
    except Exception as e:
        # 统一异常捕获
        return jsonify(ResponseCode.INTERNAL_ERROR), 500

@server.errorhandler(404)
def not_found(error):
    return jsonify({ "code": 404,  "message": "接口不存在" }), 404
@server.errorhandler(500)
def internal_error(error):
    return jsonify(ResponseCode.INTERNAL_ERROR), 500

if __name__ == '__main__':
    server.run(debug=True, port=8888, host='0.0.0.0')

爬虫脚本

# -*- encoding:utf-8 -*-
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from seleniumwire import webdriver as wiredriver
from bs4 import BeautifulSoup
import requests
import gzip
import io
import json

# 初始化selenium
def initialize_driver(chromePath: str):
    # 配置 Chrome 浏览器选项
    chrome_options = Options()
    chrome_options.add_argument('--disable-gpu')  # 禁用 GPU 加速,确保拦截请求正常
    chrome_options.add_argument('--headless')  # 不打开浏览器
    chrome_options.add_argument('--ignore-certificate-errors')  # 忽略证书错误
    # 添加指定的浏览器路径
    chrome_path = chromePath
    chrome_options.binary_location = chrome_path
    # 初始化 WebDriver,并传入配置
    driver = wiredriver.Chrome(options=chrome_options)
    return driver

# 获取启信宝的地址
def get_qixin_url(url):
    if 'www.sdxyjq.com:8080' in url:
        url = url.replace('www.sdxyjq.com:8080', 'www.sdxyjq.com')

    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
    }
    response = requests.get(url, headers=headers)
    html_content = response.text
    soup = BeautifulSoup(html_content, 'html.parser')
    iframe = soup.find('iframe')
    qiXinUrl = ''
    if iframe:
        src = iframe.get('src')
        qiXinUrl = src

    return qiXinUrl

# 格式化请求体
def parse_response_body(response_body_binary):
    try:
        # 检查数据是否以 gzip 开头
        is_gzip = response_body_binary.startswith(b'\\x1f\\x8b')
        if is_gzip:
            with gzip.GzipFile(fileobj=io.BytesIO(response_body_binary), mode='rb') as f:
                return json.loads(f.read().decode('utf-8'))
        else:
            # print('直接解码为 JSON')
            return json.loads(response_body_binary.decode('utf-8'))
    except Exception as e:
        print(f"格式化请求体失败: {e}")
        return None

def extract_response_body(requests, keyword):
    for request in requests:
        if keyword in request.url:
            return request.response.body
    return None

def getQiXinCompInfo(url:str,chromePath:str):
    try:
        # 初始化浏览器
        driver = initialize_driver(chromePath)
        # 访问启信宝的网页
        driver.get(get_qixin_url(url))
        time.sleep(3)
        # 使用 WebDriverWait 等待页面加载完成之后继续操作,等待时间根据网络情况进行调整
        wait = WebDriverWait(driver, 30)# 超时时间30s
        # 等待页面的 document.readyState 变为 "complete"
        wait.until(lambda driver: driver.execute_script("return document.readyState") == "complete")
        # 获取所有拦截的请求
        requests = driver.requests
        # 获取企业的标签信息 getEntLabel
        res_getEntLabel = extract_response_body(requests, 'getEntLabel')
        if res_getEntLabel is not None:
            res_getEntLabel = parse_response_body(res_getEntLabel)
        else:
            res_getEntLabel =''
        # 获取企业地址信息   getGeocode
        res_getGeocode = extract_response_body(requests, 'getGeocode')
        if res_getGeocode is not None:
            res_getGeocode = parse_response_body(res_getGeocode)
        else:
            res_getGeocode = ''
        # 获取企业的工商信息 getEntBasicInfoNew
        res_getEntBasicInfoNew = extract_response_body(requests,'getEntBasicInfoNew')
        if res_getEntBasicInfoNew is not None:
            res_getEntBasicInfoNew = parse_response_body(res_getEntBasicInfoNew)
        else:
            res_getEntBasicInfoNew = ''

        return {
            'baseInfo': res_getEntBasicInfoNew,
            'tagInfo': res_getEntLabel,
            'addressInfo': res_getGeocode,
        }
    finally:
        # 关闭浏览器
        driver.quit()

Flask 是什么?

Flask 是一个用 Python 编写的轻量级 Web 框架 ,它为构建 Web 应用程序和 RESTful API 提供了灵活的基础。Flask 的设计哲学是“简洁和可扩展”,它没有捆绑任何数据库或 ORM(对象关系映射)工具,开发者可以根据需求自由选择技术栈。

Flask 的核心特点

  1. 轻量级与灵活性

    • 没有强制性的数据库或 ORM,开发者可以自由选择技术(如 SQLite、MySQL、MongoDB 等)。
    • 不依赖模板引擎,默认提供简单模板,也可以替换为其他引擎(如 Jinja2)。
  2. 路由系统

    • 通过装饰器(Decorator)将 URL 路径映射到 Python 函数
  3. 扩展性

    • 通过第三方扩展(Extensions)增强功能,例如:

      • Flask-SQLAlchemy :数据库操作。
      • Flask-RESTful :快速构建 RESTful API。
      • Flask-Login :用户认证。
      • Flask-JWT :基于 JWT 的身份验证。
  4. 开发友好

    • 内置调试模式(Debug Mode),实时反映代码修改。
    • 支持单元测试和集成测试。
  5. 社区支持

    • 拥有活跃的开源社区和丰富的文档,适合快速开发和学习。

Flask 的典型应用场景

  1. 小型 Web 应用

    • 适合快速开发个人博客、仪表盘、内部工具等。
  2. RESTful API 开发

    • 构建数据接口(如 JSON API),常用于前后端分离项目。
  3. 微服务架构

    • 由于轻量级特性,适合构建独立的微服务模块。
  4. 学习 Web 开发

    • 简单的 API 和路由设计使其成为学习 Web 开发的理想工具。

Flask 与 Django 的对比

特性FLASKDJANGO
设计理念轻量级、灵活,最小功能集重量级、全功能, batteries-included
默认组件无 ORM、模板引擎(可选)内置 ORM(Django ORM)、模板引擎
学习曲线低(简单直接)高(功能丰富但复杂)
适用场景小型项目、API、需要高度控制的场景企业级大型项目、快速全栈开发
]]>