feedgenerator: ATOM part of FeedEntry finished

This commit is contained in:
Lars Kiesow 2013-04-22 17:51:51 +02:00
parent 751867d393
commit 471ea2b5a3

View file

@ -17,6 +17,8 @@ import dateutil.tz
class FeedGenerator:
__feed_entries = []
## ATOM
# http://www.atomenabled.org/developers/syndication/
# required
@ -50,7 +52,7 @@ class FeedGenerator:
__rss_cloud = None
__rss_copyright = None
__rss_docs = 'http://www.rssboard.org/rss-specification'
__rss_generator = None
__rss_generator = 'Lernfunk3 FeedGenerator'
__rss_image = None
__rss_language = None
__rss_lastBuildDate = datetime.now(dateutil.tz.tzutc())
@ -84,7 +86,7 @@ class FeedGenerator:
return val
def atom_str(self):
def __create_atom(self):
feed = etree.Element('feed', xmlns='http://www.w3.org/2005/Atom')
if self.__atom_feed_xml_lang:
feed.attrib['{http://www.w3.org/XML/1998/namespace}lang'] = \
@ -174,11 +176,123 @@ class FeedGenerator:
subtitle = etree.SubElement(feed, 'subtitle')
subtitle.text = self.__atom_subtitle
return etree.tostring(feed, pretty_print=True)
'''
outFile = open('homemade.xml', 'w')
doc.write(outFile)
'''
for entry in self.__feed_entries:
entry.atom_entry(feed)
return feed, doc
def atom_str(self, pretty=False):
feed, doc = self.__create_atom()
return etree.tostring(feed, pretty_print=pretty)
def atom_file(self, filename):
feed, doc = self.__create_atom()
with open(filename, 'w') as f:
doc.write(f)
def __create_rss(self):
feed = etree.Element('rss', version='2.0')
doc = etree.ElementTree(feed)
channel = etree.SubElement(feed, 'channel')
if not ( self.__rss_title and self.__rss_link and self.__rss_description ):
raise ValueError('Required fields not set')
title = etree.SubElement(channel, 'title')
title.text = self.__rss_title
link = etree.SubElement(channel, 'link')
link.text = self.__rss_link
link = etree.SubElement(channel, 'description')
link.text = self.__rss_description
if self.__rss_category:
for cat in self.__rss_category:
category = etree.SubElement(channel, 'category')
category.text = cat['value']
if cat.get('domain'):
category.attrib['domain'] = cat['domain']
if self.__rss_cloud:
cloud = etree.SubElement(channel, 'cloud')
cloud.attrib['domain'] = self.__rss_cloud.get('domain')
cloud.attrib['port'] = self.__rss_cloud.get('port')
cloud.attrib['path'] = self.__rss_cloud.get('path')
cloud.attrib['registerProcedure'] = self.__rss_cloud.get(
'registerProcedure')
cloud.attrib['protocol'] = self.__rss_cloud.get('protocol')
if self.__rss_copyright:
copyright = etree.SubElement(channel, 'copyright')
copyright.text = self.__rss_copyright
if self.__rss_docs:
docs = etree.SubElement(channel, 'docs')
docs.text = self.__rss_docs
if self.__rss_generator:
generator = etree.SubElement(channel, 'generator')
generator.text = self.__rss_generator
if self.__rss_image:
image = etree.SubElement(channel, 'image')
image.attrib['url'] = self.__rss_image.get('url')
image.attrib['title'] = self.__rss_image['title'] \
if self.__rss_image.get('title') else self.__rss_title
image.attrib['link'] = self.__rss_image['link'] \
if self.__rss_image.get('link') else self.__rss_link
if self.__rss_image.get('width'):
image.attrib['width'] = self.__rss_image.get('width')
if self.__rss_image.get('height'):
image.attrib['height'] = self.__rss_image.get('height')
if self.__rss_image.get('description'):
image.attrib['description'] = self.__rss_image.get('description')
if self.__rss_language:
language = etree.SubElement(channel, 'language')
language.text = self.__rss_language
if self.__rss_lastBuildDate:
lastBuildDate = etree.SubElement(channel, 'lastBuildDate')
lastBuildDate.text = self.__rss_lastBuildDate.strftime(
'%a, %e %b %Y %H:%M:%S %z')
if self.__rss_managingEditor:
managingEditor = etree.SubElement(channel, 'managingEditor')
managingEditor.text = self.__rss_managingEditor
if self.__rss_pubDate:
pubDate = etree.SubElement(channel, 'pubDate')
pubDate.text = self.__rss_pubDate.strftime(
'%a, %e %b %Y %H:%M:%S %z')
if self.__rss_rating:
rating = etree.SubElement(channel, 'rating')
rating.text = self.__rss_rating
if self.__rss_skipHours:
skipHours = etree.SubElement(channel, 'skipHours')
for h in self.__rss_skipHours:
hour = etree.SubElement(skipHours, 'hour')
hour.text = str(h)
if self.__rss_skipDays:
skipDays = etree.SubElement(channel, 'skipDays')
for d in self.__rss_skipDays:
day = etree.SubElement(skipDays, 'day')
day.text = d
if self.__rss_textInput:
textInput = etree.SubElement(channel, 'textInput')
textInput.attrib['title'] = self.__rss_textInput.get('title')
textInput.attrib['description'] = self.__rss_textInput.get('description')
textInput.attrib['name'] = self.__rss_textInput.get('name')
textInput.attrib['link'] = self.__rss_textInput.get('link')
if self.__rss_ttl:
ttl = etree.SubElement(channel, 'ttl')
ttl.text = self.__rss_ttl
if self.__rss_webMaster:
webMaster = etree.SubElement(channel, 'webMaster')
webMaster.text = self.__rss_webMaster
return feed, doc
def rss_str(self, pretty=False):
feed, doc = self.__create_rss()
return etree.tostring(feed, pretty_print=pretty)
def rss_file(self, filename):
feed, doc = self.__create_rss()
with open(filename, 'w') as f:
doc.write(f)
def title(self, title=None):
@ -257,6 +371,8 @@ class FeedGenerator:
'''Get or set link data. An link element is a dict with the fields href,
rel, type, hreflang, title, and length. Href is mandatory for ATOM.
RSS only supports one link with URL only.
:param link: Dict or list of dicts with data.
:param replace: Add or replace old data.
@ -274,8 +390,9 @@ class FeedGenerator:
set(['href', 'rel', 'type', 'hreflang', 'title', 'length']),
set(['href']),
{'rel':['alternate', 'enclosure', 'related', 'self', 'via']} )
# RSS only needs the URL:
self.__rss_link = [ l['href'] for l in self.__atom_link ]
# RSS only needs one URL. We use the first link for RSS:
if len(self.__atom_link) > 0:
self.__rss_link = self.__atom_link[0]['href']
# return the set with more information (atom)
return self.__atom_link
@ -299,6 +416,7 @@ class FeedGenerator:
rss_cat['value'] = cat['label'] if cat.get('label') else cat['term']
if cat.get('schema'):
rss_cat['domain'] = cat['schema']
self.__rss_category.append( rss_cat )
return self.__atom_category
@ -401,10 +519,10 @@ class FeedGenerator:
:param description: Description/Subtitle of the channel.
'''
return subtitle( description )
return self.subtitle( description )
def subtitle(self, docs=None):
def docs(self, docs=None):
if not docs is None:
self.__rss_docs = docs
return self.__rss_docs
@ -523,29 +641,38 @@ class FeedGenerator:
return self.__rss_webMaster
def add_entry(self, feedEntry=None):
if feedEntry is None:
feedEntry = FeedEntry()
self.__feed_entries.append( feedEntry )
return feedEntry
def add_item(self, item=None):
return self.add_entry(item)
class FeedEntry:
'''
# ATOM
# required
id
title
updated
__atom_id = None
__atom_title = None
__atom_updated = datetime.now(dateutil.tz.tzutc())
# recommended
author
content
link
summary
__atom_author = None
__atom_content = None
__atom_link = None
__atom_summary = None
# optional
category
contributor
source
rights
__atom_category = None
__atom_contributor = None
__atom_source = None
__atom_rights = None
'''
# RSS
author
category
@ -565,6 +692,288 @@ class FeedEntry:
title
'''
def __ensure_format(self, val, allowed, required, allowed_values={}):
if not val:
return None
# Make shure that we have a list of dicts. Even if there is only one.
if not isinstance(val, list):
val = [val]
for elem in val:
if not isinstance(elem, dict):
raise ValueError('Invalid data (value is no dictionary)')
if not set(elem.keys()) <= allowed:
raise ValueError('Data contains invalid keys')
if not set(elem.keys()) >= required:
raise ValueError('Data contains not all required keys')
for k,v in allowed_values.iteritems():
if elem.get(k) and not elem[k] in v:
raise ValueError('Invalid value for %s' % k )
return val
def atom_entry(self, feed):
entry = etree.SubElement(feed, 'entry')
if not ( self.__atom_id and self.__atom_title and self.__atom_updated ):
raise ValueError('Required fields not set')
id = etree.SubElement(entry, 'id')
id.text = self.__atom_id
title = etree.SubElement(entry, 'title')
title.text = self.__atom_title
updated = etree.SubElement(entry, 'updated')
updated.text = self.__atom_updated.isoformat()
# An entry must contain an alternate link if there is no content element.
if not self.__atom_content:
if not True in [ l.get('type') == 'alternate' \
for l in self.__atom_link or [] ]:
raise ValueError('Entry must contain an alternate link or '
+ 'a content element.')
# Add author elements
for a in self.__atom_author or []:
# Atom requires a name. Skip elements without.
if not a.get('name'):
continue
author = etree.SubElement(entry, 'author')
name = etree.SubElement(author, 'name')
name.text = a.get('name')
if a.get('email'):
email = etree.SubElement(author, 'email')
email.text = a.get('email')
if a.get('uri'):
email = etree.SubElement(author, 'url')
email.text = a.get('uri')
if self.__atom_content:
content = etree.SubElement(entry, 'content')
if self.__atom_content.get('src'):
content.attrib['src'] = self.__atom_content['src']
elif self.__atom_content.get('content'):
content.text = self.__atom_content.get('content')
for l in self.__atom_link or []:
link = etree.SubElement(entry, 'link', href=l['href'])
if l.get('rel'):
link.attrib['rel'] = l['rel']
if l.get('type'):
link.attrib['type'] = l['type']
if l.get('hreflang'):
link.attrib['hreflang'] = l['hreflang']
if l.get('title'):
link.attrib['title'] = l['title']
if l.get('length'):
link.attrib['length'] = l['length']
if self.__atom_summary:
summary = etree.SubElement(entry, 'summary')
summary.text = self.__atom_summary
for c in self.__atom_category or []:
cat = etree.SubElement(feed, 'category', term=c['term'])
if c.get('schema'):
cat.attrib['schema'] = c['schema']
if c.get('label'):
cat.attrib['label'] = c['label']
# Add author elements
for c in self.__atom_contributor or []:
# Atom requires a name. Skip elements without.
if not c.get('name'):
continue
contrib = etree.SubElement(feed, 'contributor')
name = etree.SubElement(contrib, 'name')
name.text = c.get('name')
if c.get('email'):
email = etree.SubElement(contrib, 'email')
email.text = c.get('email')
if c.get('uri'):
email = etree.SubElement(contrib, 'url')
email.text = c.get('uri')
if self.__atom_rights:
rights = etree.SubElement(feed, 'rights')
rights.text = self.__atom_rights
def title(self, title=None):
if not title is None:
self.__atom_title = title
self.__rss_title = title
return self.__atom_title
def id(self, id=None):
if not id is None:
self.__atom_id = id
return self.__atom_id
def updated(self, updated=None):
'''Set or get the updated value which indicates the last time the entry
was modified in a significant way.
The value can either be a string which will automatically be parsed or a
datetime.datetime object. In any case it is necessary that the value
include timezone information.
:param updated: The modification date.
:returns: Modification date as datetime.datetime
'''
if not updated is None:
if isinstance(updated, basestr):
updated = dateutil.parser.parse(updated)
if not isinstance(updated, datetime.datetime):
ValueError('Invalid datetime format')
if updated.tzinfo is None:
ValueError('Datetime object has no timezone info')
self.__atom_updated = updated
self.__rss_lastBuildDate = updated
return self.__atom_updated
def author(self, author=None, replace=False, **kwargs):
'''Get or set autor data. An author element is a dict containing a name,
an email adress and a uri. Name is mandatory for ATOM, email is mandatory
for RSS.
:param author: Dict or list of dicts with author data.
:param replace: Add or replace old data.
Example::
>>> author( { 'name':'John Doe', 'email':'jdoe@example.com' } )
[{'name':'John Doe','email':'jdoe@example.com'}]
>>> author([{'name':'Mr. X'},{'name':'Max'}])
[{'name':'John Doe','email':'jdoe@example.com'},
{'name':'John Doe'}, {'name':'Max'}]
>>> author( name='John Doe', email='jdoe@example.com', replace=True )
[{'name':'John Doe','email':'jdoe@example.com'}]
'''
if author is None and kwargs:
author = kwargs
if not author is None:
if replace or self.__atom_author is None:
self.__atom_author = []
self.__atom_author += self.__ensure_format( author,
set(['name', 'email', 'uri']), set(['name']))
return self.__atom_author
def content(self, content=None, src=None):
if not src is None:
self.__atom_content = {'src':src}
elif not content is None:
self.__atom_content = {'content':content}
return self.__atom_content
def link(self, link=None, replace=False, **kwargs):
'''Get or set link data. An link element is a dict with the fields href,
rel, type, hreflang, title, and length. Href is mandatory for ATOM.
RSS only supports one link with URL only.
:param link: Dict or list of dicts with data.
:param replace: Add or replace old data.
Example::
link(...)
'''
if link is None and kwargs:
link = kwargs
if not link is None:
if replace or self.__atom_link is None:
self.__atom_link = []
self.__atom_link += self.__ensure_format( link,
set(['href', 'rel', 'type', 'hreflang', 'title', 'length']),
set(['href']),
{'rel':['alternate', 'enclosure', 'related', 'self', 'via']} )
# RSS only needs one URL. We use the first link for RSS:
if len(self.__atom_link) > 0:
self.__rss_link = self.__atom_link[0]['href']
# return the set with more information (atom)
return self.__atom_link
def summary(self, summary=None):
if not summary is None:
self.__atom_summary = summary
return self.__atom_summary
def category(self, category=None, replace=False, **kwargs):
if category is None and kwargs:
category = kwargs
if not category is None:
if replace or self.__atom_category is None:
self.__atom_category = []
self.__atom_category += self.__ensure_format(
category,
set(['term', 'schema', 'label']),
set(['term']) )
# Map the ATOM categories to RSS categories. Use the atom:label as
# name or if not present the atom:term. The atom:schema is the
# rss:domain.
self.__rss_category = []
for cat in self.__atom_category:
rss_cat = {}
rss_cat['value'] = cat['label'] if cat.get('label') else cat['term']
if cat.get('schema'):
rss_cat['domain'] = cat['schema']
self.__rss_category.append( rss_cat )
return self.__atom_category
def contributor(self, contributor=None, replace=False, **kwargs):
if contributor is None and kwargs:
contributor = kwargs
if not contributor is None:
if replace or self.__atom_contributor is None:
self.__atom_contributor = []
self.__atom_contributor += self.__ensure_format( contributor,
set(['name', 'email', 'uri']), set(['name']))
return self.__atom_contributor
def published(self, published=None):
'''Set or get the published value which ontains the time of the initial
creation or first availability of the entry.
The value can either be a string which will automatically be parsed or a
datetime.datetime object. In any case it is necessary that the value
include timezone information.
:param published: The creation date.
:returns: Creation date as datetime.datetime
'''
if not published is None:
if isinstance(published, basestr):
published = dateutil.parser.parse(published)
if not isinstance(published, datetime.datetime):
ValueError('Invalid datetime format')
if published.tzinfo is None:
ValueError('Datetime object has no timezone info')
self.__atom_published = published
self.__rss_lastBuildDate = published
return self.__atom_published
def rights(self, rights=None):
if not rights is None:
self.__atom_rights = rights
return self.__atom_rights
if __name__ == '__main__':
fg = FeedGenerator()
fg.id('http://lernfunk.de/_MEDIAID_123')
@ -580,4 +989,19 @@ if __name__ == '__main__':
fg.subtitle('This is a cool feed!')
fg.link( href='http://larskiesow.de/test.atom', rel='self' )
fg.language('de')
print fg.atom_str()
fe = fg.add_entry()
fe.id('http://lernfunk.de/_MEDIAID_123#1')
fe.title('First Element')
fe.content('''Lorem ipsum dolor sit amet, consectetur adipiscing elit. Tamen
aberramus a proposito, et, ne longius, prorsus, inquam, Piso, si ista
mala sunt, placet. Aut etiam, ut vestitum, sic sententiam habeas aliam
domesticam, aliam forensem, ut in fronte ostentatio sit, intus veritas
occultetur? Cum id fugiunt, re eadem defendunt, quae Peripatetici,
verba.''')
fe.summary('Lorem ipsum dolor sit amet, consectetur adipiscing elit...')
fe.link( href='http://example.com', rel='alternate' )
fe.author( name='Lars Kiesow', email='lkiesow@uos.de' )
print fg.atom_str(pretty=True)
#print fg.rss_str(pretty=True)