Source code for birding.spout
"""Storm Spout classes."""
import datetime
import itertools
from streamparse.spout import Spout
from .config import get_config, import_name
[docs]def DispatchSpout(*a, **kw):
"""Factory to dispatch spout class based on config."""
spout_class_name = get_config()['Spout']
spout_class = import_name(spout_class_name, default_ns='birding.spout')
return spout_class(*a, **kw)
class TermMethods(object):
@staticmethod
def pack_tup_id(term, timestamp):
"""Pack term, timestamp into a tuple ID suitable for Storm.
Example:
>>> TermMethods.pack_tup_id('search it!', '2015-09-24T14:39:53.429183')
'search it! 2015-09-24T14:39:53.429183'
>>>
"""
return '{} {}'.format(term, timestamp)
@staticmethod
def parse_tup_id(tup_id):
"""Parse a `pack_tup_id`-packed tuple ID into term, timestamp.
Example:
>>> TermMethods.parse_tup_id('search it! 2015-09-24T14:39:53.429183')
('search it!', '2015-09-24T14:39:53.429183')
>>>
"""
return tuple(tup_id.rsplit(' ', 1))
[docs]class TermCycleSpout(Spout, TermMethods):
[docs] def initialize(self, stormconf, context):
"""Initialization steps:
1. Prepare sequence of terms based on config: TermCycleSpout/terms.
"""
self.terms = get_config()['TermCycleSpout']['terms']
self.term_seq = itertools.cycle(self.terms)
[docs] def next_tuple(self):
"""Next tuple steps:
1. Emit (term, timestamp) for next term in sequence w/current UTC time.
"""
term = next(self.term_seq)
timestamp = datetime.datetime.utcnow().isoformat()
self.emit([term, timestamp], tup_id=self.pack_tup_id(term, timestamp))