**1.每日爬取数据后,将爬取的公告通知,同步到数据库** **2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)** **取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据** ## 检索是否有科技创新企业的公告/通知 ```plain # 检查当日数据是否有科创企业名录 import re import time import pymysql import requests from gxt_spider import get_industry from kjt_spider import get_sci_kjt from sdszf_spider import get_sci_sdszf from jinja2 import Template import json def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def query_today_kc_enterprises(): keywords = [ "科技型中小企业", "高新技术企业", "众创空间", "科技领军企业", "技术先进型服务企业", "技术创新示范企业", "专精特新", "科技企业", "瞪羚", "独角兽", "科技小巨人企业", '小巨人'] not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"] sql = build_sql_query(keywords, not_contain_keywords) connection = connect_to_database() try: with connection.cursor() as cursor: cursor.execute(sql) results = cursor.fetchall() return { "total": len(results), "list": results } finally: connection.close() def build_sql_query(keywords, not_contain_keywords): like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords]) not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords]) sql = f""" SELECT CASE type WHEN '1' THEN '山东省科学技术厅' WHEN '2' THEN '山东省工业和技术化厅' WHEN '3' THEN '山东省人民政府' ELSE '未知类型' END AS type_name,date,title,url FROM `sci_spider` WHERE ({like_conditions}) AND ({not_like_conditions}) AND DATE(create_date) = DATE(NOW()) """ return sql def mail_sender(content): import smtplib from email.mime.text import MIMEText from email.header import Header # 第三方 SMTP 服务 mail_host = "smtp.163.com" # 设置服务器 mail_user = "18631839859@163.com" # 用户名 mail_pass = "GENGs7dM45TJDH6y" # 口令 sender = '18631839859@163.com' receivers = ['wonder1999@126.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱 # message = MIMEText(content, 'plain', 'utf-8') message = MIMEText(content, 'html', 'utf-8') message['From'] = Header("科技型中小企业通知", 'utf-8') message['To'] = Header("科技型中小企业", 'utf-8') subject = '科技型中小企业通知' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException: print("Error: 无法发送邮件") def wx_web_hook(data): """ 通过企业微信Webhook发送Markdown格式的消息 :param data: 包含通知数据的字典,结构应包含'total'和'list'键 :return: None """ # Webhook地址(请替换为你的实际Key) webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e" # 构造Markdown内容 content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n" for row in data['list']: content += ( f"- [{row['title']}]({row['url']}) " f"{row['date']} " f"{row['type_name']}\n" ) # 构建请求体 payload = { "msgtype": "markdown", "markdown": { "content": content } } # 发送请求并处理响应 try: response = requests.post(webhook_url, json=payload) response.raise_for_status() # 抛出HTTP错误 result = response.json() if result.get("errcode") == 0: print("✅ 消息发送成功") else: print(f"❌ 消息发送失败: {result.get('errmsg')}") except requests.exceptions.RequestException as e: print(f"⚠️ 请求异常: {e}") if __name__ == '__main__': get_industry(1, 2) get_sci_kjt(1, 1) get_sci_sdszf(1, 3) data = query_today_kc_enterprises() title = f"找到 {data['total']} 条疑似符合条件的记录:" for row in data['list']: print(row) if data['total'] > 0: wx_web_hook(data) # mail_sender('测试消息') ``` ## 工信厅爬虫 ```plain import re import time import pymysql import requests # 数据库链接 def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_industry(page_num, type): url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue da = re.findall(r' (.*?) ', response) in_url = re.findall(r'target="_blank" href="(.*?)">', response) content = re.findall(r' new_date: count = count + 1 cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_industry(1, 2) ``` ## 科技厅爬虫 ```plain import re import time import pymysql import requests def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_sci_kjt(page_num, type): url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue da = re.findall(r'(.*?)', response) sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response) content = re.findall(r'(.*?)', response) for i in range(0, len(da)): print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i]) if len(da) != len(sci_url) or len(da) != len(content): print("数据不完整,跳过插入") return new_date = find_new_date() if not new_date or new_date == '': new_date = '1970-01-01' # 默认最小日期 connection = connect_to_database() try: with connection.cursor() as cursor: sql = """ INSERT INTO `my_database_test`.`sci_spider` (`title`, `url`, `date`, `type`, `create_date`) VALUES (%s, %s, %s, %s, NOW()) """ count = 0 for i in range(len(da)): if da[i] > new_date: count = count + 1 cursor.execute(sql, (content[i], sci_url[i], da[i], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_sci_kjt(1, 1) ``` ## 山东省人民政府爬虫 ```plain import re import time import pymysql import requests def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_sci_sdszf(page_num, type): url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue # 提取日期 da = re.findall(r'\s*(\d{4}-\d{2}-\d{2})\s*', response) # 提取链接 sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response) # 提取标题(title 属性) content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response) # return print(len(da), len(sci_url), len(content)) for i in range(0, len(da)): print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i]) if len(da) != len(sci_url) or len(da) != len(content): print("数据不完整,跳过插入") return new_date = find_new_date() if not new_date or new_date == '': new_date = '1970-01-01' # 默认最小日期 connection = connect_to_database() try: with connection.cursor() as cursor: sql = """ INSERT INTO `my_database_test`.`sci_spider` (`title`, `url`, `date`, `type`, `create_date`) VALUES (%s, %s, %s, %s, NOW()) """ count = 0 for i in range(len(da)): if da[i] > new_date: count = count + 1 cursor.execute(sql, (content[i], sci_url[i], da[i], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_sci_sdszf(1, 3) ``` Loading... **1.每日爬取数据后,将爬取的公告通知,同步到数据库** **2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)** **取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据** ## 检索是否有科技创新企业的公告/通知 ```plain # 检查当日数据是否有科创企业名录 import re import time import pymysql import requests from gxt_spider import get_industry from kjt_spider import get_sci_kjt from sdszf_spider import get_sci_sdszf from jinja2 import Template import json def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def query_today_kc_enterprises(): keywords = [ "科技型中小企业", "高新技术企业", "众创空间", "科技领军企业", "技术先进型服务企业", "技术创新示范企业", "专精特新", "科技企业", "瞪羚", "独角兽", "科技小巨人企业", '小巨人'] not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"] sql = build_sql_query(keywords, not_contain_keywords) connection = connect_to_database() try: with connection.cursor() as cursor: cursor.execute(sql) results = cursor.fetchall() return { "total": len(results), "list": results } finally: connection.close() def build_sql_query(keywords, not_contain_keywords): like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords]) not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords]) sql = f""" SELECT CASE type WHEN '1' THEN '山东省科学技术厅' WHEN '2' THEN '山东省工业和技术化厅' WHEN '3' THEN '山东省人民政府' ELSE '未知类型' END AS type_name,date,title,url FROM `sci_spider` WHERE ({like_conditions}) AND ({not_like_conditions}) AND DATE(create_date) = DATE(NOW()) """ return sql def mail_sender(content): import smtplib from email.mime.text import MIMEText from email.header import Header # 第三方 SMTP 服务 mail_host = "smtp.163.com" # 设置服务器 mail_user = "18631839859@163.com" # 用户名 mail_pass = "GENGs7dM45TJDH6y" # 口令 sender = '18631839859@163.com' receivers = ['wonder1999@126.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱 # message = MIMEText(content, 'plain', 'utf-8') message = MIMEText(content, 'html', 'utf-8') message['From'] = Header("科技型中小企业通知", 'utf-8') message['To'] = Header("科技型中小企业", 'utf-8') subject = '科技型中小企业通知' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException: print("Error: 无法发送邮件") def wx_web_hook(data): """ 通过企业微信Webhook发送Markdown格式的消息 :param data: 包含通知数据的字典,结构应包含'total'和'list'键 :return: None """ # Webhook地址(请替换为你的实际Key) webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e" # 构造Markdown内容 content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n" for row in data['list']: content += ( f"- [{row['title']}]({row['url']}) " f"<font color=\"comment\">{row['date']}</font> " f"<font color=\"warning\">{row['type_name']}</font>\n" ) # 构建请求体 payload = { "msgtype": "markdown", "markdown": { "content": content } } # 发送请求并处理响应 try: response = requests.post(webhook_url, json=payload) response.raise_for_status() # 抛出HTTP错误 result = response.json() if result.get("errcode") == 0: print("✅ 消息发送成功") else: print(f"❌ 消息发送失败: {result.get('errmsg')}") except requests.exceptions.RequestException as e: print(f"⚠️ 请求异常: {e}") if __name__ == '__main__': get_industry(1, 2) get_sci_kjt(1, 1) get_sci_sdszf(1, 3) data = query_today_kc_enterprises() title = f"找到 {data['total']} 条疑似符合条件的记录:" for row in data['list']: print(row) if data['total'] > 0: wx_web_hook(data) # mail_sender('测试消息') ``` ## 工信厅爬虫 ```plain import re import time import pymysql import requests # 数据库链接 def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_industry(page_num, type): url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue da = re.findall(r'<div class="bottom"> <span> (.*?) </span>', response) in_url = re.findall(r'target="_blank" href="(.*?)">', response) content = re.findall(r'<a title="(.*?)" target="_blank"', response) for i in range(0, len(da)): print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + in_url[i]) if len(da)*2 != len(in_url) or len(da)*2 != len(content): print("数据不完整,跳过插入") return new_date = find_new_date() if not new_date or new_date == '': new_date = '1970-01-01' # 默认最小日期 connection = connect_to_database() try: with connection.cursor() as cursor: sql = """ INSERT INTO `my_database_test`.`sci_spider` (`title`, `url`, `date`, `type`, `create_date`) VALUES (%s, %s, %s, %s, NOW()) """ count = 0 for i in range(len(da)): if da[i][0:10] > new_date: count = count + 1 cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_industry(1, 2) ``` ## 科技厅爬虫 ```plain import re import time import pymysql import requests def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_sci_kjt(page_num, type): url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue da = re.findall(r'<span class="pull-right">(.*?)</span>', response) sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response) content = re.findall(r'<s></s>(.*?)</a></li>', response) for i in range(0, len(da)): print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i]) if len(da) != len(sci_url) or len(da) != len(content): print("数据不完整,跳过插入") return new_date = find_new_date() if not new_date or new_date == '': new_date = '1970-01-01' # 默认最小日期 connection = connect_to_database() try: with connection.cursor() as cursor: sql = """ INSERT INTO `my_database_test`.`sci_spider` (`title`, `url`, `date`, `type`, `create_date`) VALUES (%s, %s, %s, %s, NOW()) """ count = 0 for i in range(len(da)): if da[i] > new_date: count = count + 1 cursor.execute(sql, (content[i], sci_url[i], da[i], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_sci_kjt(1, 1) ``` ## 山东省人民政府爬虫 ```plain import re import time import pymysql import requests def connect_to_database(): connection = pymysql.connect( host='127.0.0.1', user='root', password='123456', database='my_database_test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return connection def find_new_date(): connection = connect_to_database() try: with connection.cursor() as cursor: sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1" cursor.execute(sql) results = cursor.fetchall() return results[0]['date'] except Exception as e: return '' connection.close() finally: connection.close() def get_sci_sdszf(page_num, type): url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}') user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" headers = { "Referer": None, "User-Agent": user_Agent } while True: try: response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' response = response.text break except: print("请求失败,尝试睡眠一会(半小时)") sleep_time = 60 * 30 time.sleep(sleep_time) print("睡眠结束,继续运行...") continue # 提取日期 da = re.findall(r'<span>\s*(\d{4}-\d{2}-\d{2})\s*</span>', response) # 提取链接 sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response) # 提取标题(title 属性) content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response) # return print(len(da), len(sci_url), len(content)) for i in range(0, len(da)): print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i]) if len(da) != len(sci_url) or len(da) != len(content): print("数据不完整,跳过插入") return new_date = find_new_date() if not new_date or new_date == '': new_date = '1970-01-01' # 默认最小日期 connection = connect_to_database() try: with connection.cursor() as cursor: sql = """ INSERT INTO `my_database_test`.`sci_spider` (`title`, `url`, `date`, `type`, `create_date`) VALUES (%s, %s, %s, %s, NOW()) """ count = 0 for i in range(len(da)): if da[i] > new_date: count = count + 1 cursor.execute(sql, (content[i], sci_url[i], da[i], type)) connection.commit() print(f"已成功插入 {count} 条数据") except Exception as e: print(f"插入数据失败: {e}") connection.rollback() finally: connection.close() if __name__ == '__main__': get_sci_sdszf(1, 3) ``` 最后修改:2025 年 06 月 28 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 如果觉得我的文章对你有用,请随意赞赏 文章引用 反向引用 Loading... 暂未引用其他文章 暂未被其它文章引用 下一篇 上一篇 发表评论 取消回复 使用cookie技术保留您的个人信息以便您下次快速评论,继续评论表示您已同意该条款 评论 * 私密评论 名称 * 🎲 邮箱 * 地址 发表评论 提交中...