birding: streamparse/kafka demo¶
birding is an open source project to produce a stream of recent twitter activity based on a sequence of search terms, using only twitter’s public APIs. It serves as both a standalone project and a demo of distributed real-time computing with Python using Storm/streamparse and Kafka/pykafka.
Problem statement & topology describes the problem and how it fits into a topology. Downloading and running birding describes how to interact with birding for development, demo, or light usage. A tour of birding’s implementation provides a light introduction to internals. Using birding in production discusses how birding is packaged for production use in an existing streamparse project. Configuring birding discusses various options for birding behavior when running locally or in production.
Problem statement & topology¶
Problem Statement¶
Take as input a sequence of terms and timestamps and produce a “filtered firehose” of twitter activity using only twitter’s public APIs, without requiring special API access to twitter or any third party.
Specifics¶
- Input is in the format of (term, timestamp), where term is any string and
timestamp is a date/time value in an ISO 8601 format,
e.g.
2015-06-25T08:00Z
. - The motivating use-case:
Observations¶
Twitter provides GET search/tweets to get relevant Tweets (status updates) matching a specified query. Any detail not provided in the search results can be accessed with GET statuses/lookup, looking up multiple status updates in a batched request.
The problem has potentially unbounded streams of data, which makes Storm a relevant technology for the solution. Given that the motivating use-case prefers Python with Kafka I/O, streamparse and pykafka are relavant.
Other Goals¶
The solution should:
- Encode best practices about how to use Storm/streamparse and Kafka/pykafka.
- Be fully public & open source to serve as an example project, so it should not depend on anything specific to a specific company/organization. Depending on the publicly scrutable Twitter API is, of course, okay.
- Include basic command-line tools for testing the topology with data and ways to configure things like Twitter authentication credentials.
Next, goto one of:
Downloading and running birding¶
Note
Existing streamparse projects should include the birding Python package instead of cloning the birding repository, which is described in Using birding in production.
The birding project fully automates dependencies for the purposes of development, demo, or light usage. In a terminal on a Unix-like system, clone the birding repository:
git clone https://github.com/Parsely/birding.git
cd birding
Then run:
make run
The birding project makes every effort to detect if an underlying dependency is unmet. If make run fails, look for messages indicating what is missing or what went wrong. If an error message says that an address is in use, look for other processes on the system which are currently using the referenced network port, then shut them down in order to run birding. If an error is unclear, submit an issue including a build log and mention your operating system. To create a build.log:
make run 2>&1 | tee build.log
When birding is running, its console output is verbose as it includes all
output of zookeeper, kafka, storm, and streamparse. Note that – as with all
streamparse projects – output from the birding code itself ends up in the
logs/
directory and not in the console. To stop running birding, issue a
keyboard interrupt in the console with Control-C:
Control-C
Using make run will pick up birding.yml as the project configuration file if it exists in the root directory next to the Makefile. See Configuring birding. This simple birding.yml to sets the search terms used by birding:
TermCycleSpout:
terms:
- mocking bird
- carrier pigeon
Data for the project ends up in a directory relative to the project root. Clean runtime data with:
make clean-data
Build docs with make docs
and check for Python errors by static analysis
with make flakes
. Make allows multiple targets at once:
make clean-data flakes run
Next, goto one of:
A tour of birding‘s implementation¶
Python Twitter Client¶
There are many Python packages for Twitter. The Python Twitter Tools
project (pip install twitter
) is of interest because:
- It has a command-line application to get twitter activity which includes a straightforward authentication workflow to log into twitter and get OAuth credentials, using a PIN-Based workflow.
- It provides APIs in Python which bind to twitter’s public APIs in a
dynamic and predictable way, where Python attribute and method names
translate to URL paths, e.g.
twitter.statuses.friends_timeline()
retrieves data fromhttp://twitter.com/statuses/friends_timeline.json
. - The OAuth credentials saved by the command-line tool can be readily used when making API calls using the package.
Twitter API¶
To ease configuration, birding adds a
from_oauth_file()
method which will creates a
Twitter binding using the OAuth credential file created by the twitter
command-line application. The twitter
command need only be run once to
create this file, which is saved in the user home directory at
~/.twitter_oauth
. Once that file is in place, twitter API interactions look
like this:
Search Manager¶
It is useful to solve the problem itself before being concerned with details
about the topology. birding’s TwitterSearchManager
composes the Twitter object into higher-level method signatures which perform
the processing steps needed for the given Problem statement & topology. A full interaction
before applying Storm looks like this (see In[2]
):
Storm Bolts¶
With APIs in place to do the work, Bolt classes provide Storm components:
TwitterSearchBolt
searches the input terms.TwitterLookupBolt
expands search results into full tweets.ElasticsearchIndexBolt
indexes the lookup results in elasticsearch.ResultTopicBolt
publishes the lookup results to Kafka.
Storm Spouts¶
Spout classes provide Storm components which take birding’s input and provide the source of streams in the topology:
DispatchSpout()
dispatches spout class based on config. See Configuring birding.TermCycleSpout
cycles through a static list of terms.
Storm Topology¶
With Storm components ready for streamparse, a topology can pull it all
together. birding’s topology uses the Clojure DSL; the streamparse
discussion of topologies has more detail. In the topology definition below,
note the class references "birding.bolt.TwitterSearchBolt"
,
"birding.bolt.TwitterLookupBolt"
, and
"birding.bolt.ResultTopicBolt"
. These are full Python namespace references
to the birding classes. The names given in the DSL can then be used to wire the
components together. For example, the definition of "search-bolt"
(python-bolt-spec ...)
allows "search-bolt"
to be used as input in
another bolt, "lookup-bolt" (python-bolt-spec ... {"search-bolt" :shuffle}
... )
.
(ns birding
(:use [streamparse.specs])
(:gen-class))
(defn birding [options]
[
;; spout configuration
{"term-spout" (python-spout-spec
options
; Dispatch class based on birding.yml.
"birding.spout.DispatchSpout"
["term" "timestamp"]
:conf {"topology.max.spout.pending", 8}
)
}
;; bolt configuration
{"search-bolt" (python-bolt-spec
options
; Use field grouping on term to support in-memory caching.
{"term-spout" ["term"]}
"birding.bolt.TwitterSearchBolt"
["term" "timestamp" "search_result"]
:p 2
)
"lookup-bolt" (python-bolt-spec
options
{"search-bolt" :shuffle}
"birding.bolt.TwitterLookupBolt"
["term" "timestamp" "lookup_result"]
:p 2
)
"elasticsearch-index-bolt" (python-bolt-spec
options
{"lookup-bolt" :shuffle}
"birding.bolt.ElasticsearchIndexBolt"
[]
:p 1
)
"result-topic-bolt" (python-bolt-spec
options
{"lookup-bolt" :shuffle}
"birding.bolt.ResultTopicBolt"
[]
:p 1
)
}
]
)
Next, goto one of:
Using birding in production¶
Note
birding is currently alpha software.
If birding itself satisfies project requirements, see the streamparse project’s
discussion of remote deployment and use sparse submit
from a checkout of
the birding repository. Otherwise, birding is available on the Python Package
Index, which projects can use as a
dependency:
pip install birding
Once installed in the Python environment, birding references are available to
the topology definition. A project’s topology can include
python-spout-spec
and python-bolt-spec
declarations which have class
references to birding.spout
and birding.bolt
namespaces, respectively.
The snippet below illustrates this. The Storm Topology section has more
detail.
"search-bolt" (python-bolt-spec
options
{"term-spout" ["term"]}
"birding.bolt.TwitterSearchBolt"
["term" "timestamp" "search_result"]
:p 2)
The streamparse project discusses remote deployment using the sparse
submit
command. Configuring birding discusses the birding.yml
file which is
located by the BIRDING_CONF
environment variable. Projects using birding
should include its configuration file as part of host configuration management
or a streamparse submit hook, and likewise set the BIRDING_CONF
variable
accordingly.
Next, goto Configuring birding.
Configuring birding¶
birding uses a validated configuration file for runtime details.
Configuration files use a YAML format. All values have a
default (below) and accept values of the same name in the configuration file,
which has a default path of birding.yml
in the current working
directory. If needed, the BIRDING_CONF
environment variable can point to
the filepath of the configuration file.
The scope of the configuration file is limited to details of birding itself, not of Storm-related topics. Storm details are in the project topology definition.
When a configuration value is a Python dotted name, it is a string reference to
the Python object to import. In general, when the value is just an object name
without a full namespace, its assumed to be the relevant birding namespace,
e.g. LRUShelf
is assumed to be birding.shelf.LRUShelf
. Respective
*_init
configuration values specify keyword (not positional) arguments to
be passed to the class constructor.
See Using birding in production for further discussion on configuration in production environments.
For advanced API usage, see get_config()
. The config includes an
Appendix to support any additional values not known to birding, such that
these values are available in config['Appendix']
and bypass any
validation. This is useful for code which uses birding’s config loader and
needs to define additional values.
Defaults:
Spout: TermCycleSpout
TermCycleSpout:
terms:
- real-time analytics
- apache storm
- pypi
SearchManager:
class: birding.twitter.TwitterSearchManagerFromOAuth
init: {}
TwitterSearchBolt:
shelf_class: FreshLRUShelf
shelf_init: {}
shelf_expiration: 300
ElasticsearchIndexBolt:
elasticsearch_class: elasticsearch.Elasticsearch
elasticsearch_init:
hosts:
- localhost: 9200
index: tweet
doc_type: tweet
ResultTopicBolt:
kafka_class: pykafka.KafkaClient
kafka_init:
hosts: 127.0.0.1:9092 # comma-separated list of hosts
topic: tweet
shelf_class: ElasticsearchShelf
shelf_init: {}
shelf_expiration: null
Appendix: {}
Searching Gnip¶
Gnip is Twitter’s enterprise API platform, which birding supports for projects seeking to search at higher rates than allowed in the public API. The configuration snippet below uses Gnip’s APIs instead of Twitter. See Configuring birding for how to configure birding.
SearchManager:
class: birding.gnip.GnipSearchManager
init:
base_url: https://search.gnip.com/accounts/Example
stream: prod.json
username: admin@example.org
password: This.yml.file.should.be.untracked.
See birding API docs for Gnip
and
GnipSearchManager
for underlying behavior, which is
minimal.
API¶
-
class
birding.spout.
TermCycleSpout
[source]¶
-
class
birding.bolt.
TwitterSearchBolt
[source]¶ -
initialize
(conf, ctx)[source]¶ Initialization steps:
- Get
search_manager_from_config()
. - Prepare to track searched terms as to avoid redundant searches.
- Get
-
-
class
birding.bolt.
TwitterLookupBolt
[source]¶ -
-
process
(tup)[source]¶ Process steps:
- Stream in (term, timestamp, search_result).
- Perform
lookup_search_result()
. - Emit (term, timestamp, lookup_result).
-
-
class
birding.bolt.
ElasticsearchIndexBolt
[source]¶
-
class
birding.bolt.
ResultTopicBolt
[source]¶
-
birding.search.
search_manager_from_config
()[source]¶ Get a SearchManager instance dynamically based on config.
config is a dictionary containing
class
andinit
keys as defined inbirding.config
.
-
class
birding.search.
SearchManager
[source]¶ Abstract base class for service object to search for tweets.
-
class
birding.twitter.
Twitter
(format=u'json', domain=u'api.twitter.com', secure=True, auth=None, api_version=<class 'twitter.api._DEFAULT'>, retry=False)[source]¶ -
classmethod
from_oauth_file
(filepath=None)[source]¶ Get an object bound to the Twitter API using your own credentials.
The twitter library ships with a twitter command that uses PIN OAuth. Generate your own OAuth credentials by running twitter from the shell, which will open a browser window to authenticate you. Once successfully run, even just one time, you will have a credential file at ~/.twitter_oauth.
This factory function reuses your credential file to get a Twitter object. (Really, this code is just lifted from the twitter.cmdline module to minimize OAuth dancing.)
-
classmethod
-
class
birding.twitter.
TwitterSearchManager
(twitter)[source]¶ Service object to provide fully-hydrated tweets given a search query.
-
birding.twitter.
TwitterSearchManagerFromOAuth
()[source]¶ Build
TwitterSearchManager
from user OAuth file.Arguments are passed to
birding.twitter.Twitter.from_oauth_file()
.
-
class
birding.gnip.
Gnip
(base_url, stream, username, password, **params)[source]¶ Simple binding to Gnip search API.
-
session_class
¶ alias of
Session
-
-
class
birding.gnip.
GnipSearchManager
(*a, **kw)[source]¶ Service object to provide fully-hydrated tweets given a search query.
-
birding.config.
get_config
(filepath=None, default_loader=None, on_missing=None)[source]¶ Get a dict for the current birding configuration.
The resulting dictionary is fully populated with defaults, such that all valid keys will resolve to valid values. Invalid and extra values in the configuration result in an exception.
See Configuring birding (module-level docstring) for discussion on how birding configuration works, including filepath loading. Note that a non-default filepath set via env results in a
OSError
when the file is missing, but the default filepath is ignored when missing.This function caches its return values as to only parse configuration once per set of inputs. As such, treat the resulting dictionary as read-only as not to accidentally write values which will be seen by other handles of the dictionary.
Parameters: Returns: dict of current birding configuration; treat as read-only.
Return type:
-
birding.shelf.
shelf_from_config
()[source]¶ Get a Shelf instance dynamically based on config.
config is a dictionary containing
shelf_*
keys as defined inbirding.config
.
-
class
birding.shelf.
Shelf
[source]¶ Abstract base class for a shelf to track – but not iterate – values.
Provides a dict-interface.
-
unpack
(key, value)[source]¶ Unpack value from
getitem
.This is useful for Shelf implementations which require metadata be stored with the shelved values, in which case
pack
should implement the inverse operation. By default, the value is simply passed through without modification. Theunpack
implementation is called on__getitem__
and therefore can raise KeyError if packed metadata indicates that a value is invalid.
-
-
class
birding.shelf.
FreshPacker
[source]¶ Mixin for pack/unpack implementation to expire shelf content.
-
expire_after
= 300¶ Values are no longer fresh after this value, in seconds.
-
-
class
birding.shelf.
LRUShelf
(maxsize=1000)[source]¶ An in-memory Least-Recently Used shelf up to maxsize..
-
class
birding.shelf.
FreshLRUShelf
(maxsize=1000)[source]¶ A Least-Recently Used shelf which expires values.
To discuss this project, join the streamparse user group.