feedgenerator

A simple tool to create various feeds
git clone https://git.ortlepp.eu/feedgenerator.git/
Log | Files | Refs | README | LICENSE

heise.py (6016B)


      1 # -*- coding: utf-8 -*-
      2 
      3 import requests
      4 import sqlite3
      5 import datetime
      6 import feedparser
      7 from bs4 import BeautifulSoup
      8 from common import AtomFeed, FeedItem, Config
      9 
     10 class HeiseFeed:
     11 
     12     FEED_NAME = "Heise Online"
     13     FEED_AUTHOR = "Heise Online"
     14     FEED_BASEURL = "https://www.heise.de/"
     15     FEED_ICON = "https://www.heise.de/icons/ho/favicon/favicon-16x16.png"
     16     FEED_LOGO = "https://upload.wikimedia.org/wikipedia/commons/thumb/0/05/Heise_online.svg/320px-Heise_online.svg.png"
     17 
     18     FEED_URL_NETZPOLITIK = "https://www.heise.de/rss/heise-Rubrik-Netzpolitik.rdf"
     19     FEED_URL_IT = "https://www.heise.de/rss/heise-Rubrik-IT.rdf"
     20 
     21     SQLITE_SQL_CREATE = "CREATE TABLE IF NOT EXISTS heise (key TEXT, title TEXT, content TEXT, link TEXT, created INTEGER, export INTEGER)"
     22     SQLITE_SQL_CHECK = "SELECT COUNT(*) FROM heise WHERE key = ? or link = ?"
     23     SQLITE_SQL_INSERT = "INSERT INTO heise (key, title, content, link, created, export) VALUES (?, ?, ?, ?, ?, ?)"
     24     SQLITE_SQL_CLEAN = "DELETE FROM heise WHERE created < ?"
     25     SQLITE_SQL_GET = "SELECT title, content, link, created FROM heise WHERE export = 1 ORDER BY created DESC"
     26 
     27     IGNORE_TITLE = Config().get_heise_ignoretitle()
     28     IGNORE_URL = Config().get_heise_ignoreurl()
     29 
     30 
     31     def __is_on_ignore_list(self, string, list):
     32         if len(string) == 0:
     33             return False
     34         for item in list:
     35             if item in string:
     36                 return True
     37         return False
     38 
     39 
     40     def __read_article_content(self, link):
     41         desired_elements = ["h1", "h2", "h3", "h4", "h5", "h6", "p", "ul", "a-code", "pre"]
     42 
     43         try:
     44             request = requests.get(link + "?seite=all")
     45 
     46             if request.status_code == 200:
     47                 html = BeautifulSoup(request.text, "html.parser")
     48                 content = ""
     49 
     50                 try:
     51                     intro = html.find("p", {"class", "a-article-header__lead"}).text.strip()
     52                     content = content + "<p><strong>" + intro + "</strong></p>"
     53                 except:
     54                     pass
     55 
     56                 article = html.find("div", {"class", "article-content"})
     57 
     58                 for element in article:
     59                     if element.name in desired_elements:
     60                         content = content + str(element)
     61 
     62                 return content
     63 
     64             else:
     65                 return "Reading article failed, HTTP status code was " + str(request.status_code)
     66         except:
     67             return "Reading article failed"
     68 
     69 
     70     def __read_feed(self, link):
     71         now = int(str(datetime.datetime.now().timestamp()).split('.')[0])
     72         config = Config()
     73         connection = sqlite3.connect(config.get_database())
     74 
     75         try:
     76             cursor = connection.cursor()
     77             cursor.execute(self.SQLITE_SQL_CREATE)
     78             connection.commit()
     79 
     80             # Delete db content older than 5 days
     81             threshold = now - 432000
     82             cursor.execute(self.SQLITE_SQL_CLEAN, (threshold,))
     83             connection.commit()
     84 
     85             request = requests.get(link)
     86 
     87             if request.status_code == 200:
     88                 feed = feedparser.parse(request.text)
     89 
     90                 for entry in feed.entries:
     91                     title = entry.title
     92                     key = title.strip().lower()
     93                     link = entry.link
     94                     index = entry.link.rindex("?")
     95                     if index > 0:
     96                         link = link[0:index]
     97 
     98                     if self.__is_on_ignore_list(title, self.IGNORE_TITLE):
     99                         continue
    100                     if self.__is_on_ignore_list(link, self.IGNORE_URL):
    101                         continue
    102 
    103                     count = 0
    104                     cursor.execute(self.SQLITE_SQL_CHECK, (key, link))
    105                     rows = cursor.fetchall()
    106 
    107                     for row in rows:
    108                         count = row[0]
    109 
    110                     if count == 0:
    111                         content = self.__read_article_content(link)
    112                         export = 1
    113 
    114                         cursor.execute(self.SQLITE_SQL_INSERT, (key, title, content, link, now, export))
    115                         connection.commit()
    116 
    117             else:
    118                 title = "Reading feed failed"
    119                 key = title.strip().lower()
    120                 content = "HTTP status code was " + str(request.status_code)
    121                 cursor.execute(self.SQLITE_SQL_INSERT, (key, title, content, self.FEED_BASEURL, now))
    122                 connection.commit()
    123 
    124         except:
    125             title = "Reading feed failed"
    126             key = title.strip().lower()
    127             content = "Error while fetching the feed"
    128             cursor.execute(self.SQLITE_SQL_INSERT, (key, title, content, self.FEED_BASEURL, now))
    129             connection.commit()
    130 
    131         finally:
    132             connection.commit()
    133             connection.close()
    134 
    135 
    136     def create_feed(self, feedfile, maxitems):
    137         self.__read_feed(self.FEED_URL_NETZPOLITIK)
    138         self.__read_feed(self.FEED_URL_IT)
    139 
    140         feed = AtomFeed(self.FEED_NAME, self.FEED_AUTHOR, self.FEED_BASEURL, datetime.datetime.now(), self.FEED_ICON, self.FEED_LOGO)
    141         config = Config()
    142         connection = sqlite3.connect(config.get_database())
    143 
    144         try:
    145             cursor = connection.cursor()
    146             cursor.execute(self.SQLITE_SQL_GET)
    147             rows = cursor.fetchall()
    148             added = 0
    149 
    150             for row in rows:
    151                 feed.add_item(FeedItem(row[0], datetime.datetime.fromtimestamp(int(row[3])), self.FEED_AUTHOR, row[1], row[2]))
    152                 added = added + 1
    153                 if added >= maxitems:
    154                     break
    155 
    156         except:
    157             error_title = "Feed creation failed"
    158             error_content = "<p>Error while creating the feed</p>"
    159             feed.add_item(FeedItem(error_title, datetime.datetime.now(), self.FEED_AUTHOR, error_content, self.WEBSITE_URL))
    160         finally:
    161             connection.commit()
    162             connection.close()
    163 
    164         return feed.write_feed(feedfile)