#### Source code for technorati.py ####


NAME = 'Technorati/Python'
VERSION = '0.03'

# Copyright (C) 2003 Phillip Pearson

URL = 'http://www.myelin.co.nz/technorati_py/'

# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:

# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# Related work:
#
#    PyTechnorati by Mark Pilgrim:
#        http://diveintomark.org/projects/pytechnorati/
#
#    xmltramp/technorati.py by Aaron Swartz
#        http://www.aaronsw.com/2002/xmltramp/technorati.py

__history__ = '''

v0.03

    - now supporting the new 'search' command.

v0.02

    - now using the latest version of the API (no .xml URLs, format=
      and version= arguments)

    - you can now get more than just the first page of cosmos results
      (use start= or -s / --start)

    - now throwing an exception when we get an HTTP error

    - '--cosmos' command-line option added (same as --inbound)

    - now supporting all license key locations used by PyTechnorati

v0.01

    initial release
    http://www.myelin.co.nz/post/2003/5/12/#200305124

'''

import urllib, sgmllib, os, sys
from pprint import pprint

def setLicense(license_key): 
    "Set the license key"
    global LICENSE_KEY
    LICENSE_KEY = license_key

def findkey(license_key=None): 
    "Find out the current user's API key"
    class GotIt(Exception): 
        def __init__(self, key): 
            self.key = key
    def tryvar(key): 
        if key: 
            raise GotIt(key)
    def tryfile(fn): 
        if DEBUG:  print "trying", fn
        if os.path.exists(fn): 
            tryvar(open(fn).readline().strip())
    def modulepath(): 
        return os.path.split(os.path.abspath(sys.argv[0]))[0]
    try: 
        tryvar(license_key)
        tryvar(LICENSE_KEY)
        tryvar(os.environ.get('TECHNORATI_LICENSE_KEY', None))
        for path in ('.', 
                     os.path.expanduser('~'), 
                     modulepath()): 
            for leaf in ('.technoratikey', 
                         'technoratikey.txt', 
                         'apikey.txt'): 
                tryfile(os.path.join(path, leaf))
    except GotIt, g: 
        setLicense(g.key)
        return LICENSE_KEY
    raise Exception, "Can't find license key"

LICENSE_KEY = None
DEBUG = 0

class opener(urllib.FancyURLopener): 
    version = '%s v%s; %s'%(NAME, VERSION, URL)
    def http_error_default(self, url, fp, errcode, errmsg, headers, data=None): 
        raise IOError, "HTTP error %s fetching http:%s"%(errcode, url)

callcache = {}
try: 
    callcache = eval(open('cache.txt').read())
except: 
    pass

class BadUrlError(Exception): 
    pass
    
def call(proc, args, license_key=None): 
    #if args['url'] in (None, ''):
    #    raise BadUrlError("No URL supplied")
    args['key'] = findkey(license_key)
    args['format'] = 'xml'
    args['version'] = '0.9'
    url = 'http://api.technorati.com/%s?%s'%(proc, urllib.urlencode(args))
    print "calling", url
    if not callcache.has_key(url): 
        print "(fetching)"
        o = opener()
        f = o.open(url)
        callcache[url] = f.read()
    xml = callcache[url]
    if DEBUG: 
        print xml
    return xml

def parse(parser, xml): 
    parser.feed(xml)
    parser.close()
    return parser.data

class genericParser(sgmllib.SGMLParser): 
    def __init__(self, itemsName): 
        sgmllib.SGMLParser.__init__(self)
        self.data = {}
        self.inresult = self.inweblog = self.initem = 0
        self.weblog = None
        self.item = None
        self.data[itemsName] = self.items = []
        self.collector = None

    def collect(self): 
        assert self.collector is None, "already collecting: parse failure!"
        self.collector = []
    def grab(self): 
        s = "".join(self.collector)
        self.collector = None
        return s

    def handle_data(self, s): 
        if self.collector is not None: 
            self.collector.append(s)
        
    def start_document(self, attrs): 
        pass
    def end_document(self): 
        pass

    def start_result(self, attrs): 
        self.inresult = 1
    def end_result(self): 
        self.inresult = 0

    def start_item(self, attrs): 
        self.initem = 1
        self.item = {}
    def end_item(self): 
        self.initem = 0
        self.items.append(self.item)
        self.item = None

    def start_nearestpermalink(self, attrs): 
        assert self.initem
        self.collect()
    def end_nearestpermalink(self): 
        self.item['nearestpermalink'] = self.grab()
    def start_excerpt(self, attrs): 
        assert self.initem
        self.collect()
    def end_excerpt(self): 
        self.item['excerpt'] = self.grab()
    def start_linkcreated(self, attrs): 
        assert self.initem
        self.collect()
    def end_linkcreated(self): 
        self.item['linkcreated'] = self.grab()

    def start_weblog(self, attrs): 
        assert self.initem or self.inresult, "found <weblog> element outside <result> or <item>"
        self.inweblog = 1
        self.weblog = {}
    def end_weblog(self): 
        self.inweblog = 0
        if self.initem: 
            self.item['weblog'] = self.weblog
            #self.weblogs.append(self.weblog)
        elif self.inresult: 
            self.data['weblog'] = self.weblog
        else: 
            raise AssertionFailure, "<weblog> element not in item or result...?"
        self.weblog = None

    def start_rankingstart(self, attrs): 
        self.collect()
    def end_rankingstart(self): 
        self.data['rankingstart'] = int(self.grab())
        
    def start_url(self, attrs): 
        self.collect()
    def end_url(self): 
        if self.inweblog: 
            self.weblog['url'] = self.grab()
        else: 
            self.data['url'] = self.grab()
    def start_name(self, attrs): 
        self.collect()
    def end_name(self): 
        self.weblog['name'] = self.grab()
    def start_rssurl(self, attrs): 
        self.collect()
    def end_rssurl(self): 
        self.weblog['rssurl'] = self.grab()
    def start_inboundblogs(self, attrs): 
        self.collect()
    def end_inboundblogs(self): 
        if self.inweblog: 
            x = self.weblog
        elif self.inresult: 
            x = self.data
        else: 
            raise AssertionFailure, "<inboundblogs> element not in <result> or <weblog>"
        temp = self.grab(); 
	if (temp): 
	    x['inboundblogs'] = int(temp)
        else: 
	    x['inboundblogs'] = 0
    def start_inboundlinks(self, attrs): 
        self.collect()
    def end_inboundlinks(self): 
        if self.inweblog: 
            x = self.weblog
        elif self.inresult: 
            x = self.data
        else: 
            raise AssertionFailure, "<inboundlinks> element not in <result> or <weblog>"
        temp = self.grab(); 
        if (temp): 
           x['inboundlinks'] = int(temp)
        else: 
           x['inboundlinks'] = 0
    def start_lastupdate(self, attrs): 
        self.collect()
    def end_lastupdate(self): 
        self.weblog['lastupdate'] = self.grab()

def getCosmos(url, limit=None, start=None, license_key=None): 
    "gets a blog's cosmos and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('inLinks') of Link objects for its neighbours"
    args = {'url': url}
    if limit is not None: 
        args['limit'] = '%d'%limit
    if start is not None: 
        args['start'] = '%d'%start
    xml = call('cosmos', args, license_key)
    data = parse(genericParser('inbound'), xml)
    return data

def getBlogInfo(url, license_key=None): 
    "gets info about a blog and returns it as a Weblog object"
    xml = call('bloginfo', {'url': url}, license_key)
    data = parse(genericParser('weblogs'), xml)
    return data.get('weblog', None)

def getInfo(url, license_key=None): 
    "gets info about the person"
    xml = call('getinfo', {'url': url}, license_key)
    data = parse(genericParser('result'), xml)
    return data

def getOutboundBlogs(url, license_key=None): 
    "gets a list of blogs linked to by a blog and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('outLinks') of Weblog objects for the linked-to blogs"
    xml = call('outbound', {'url': url}, license_key)
    data = parse(genericParser('outbound'), xml)
    return data

def search(query, license_key=None): 
    xml = call('search', {'query': query}, license_key)
    data = parse(genericParser('search'), xml)
    return data

def test(url): 
    pprint(search('"David Sifry"'))
    pprint(getCosmos(url))
    pprint(getBlogInfo(url))
    pprint(getOutboundBlogs(url))

def main(): 
    import sys, getopt
    opts, rest = getopt.getopt(sys.argv[1: ], 'dts:u:', ('debug', 'test', 'inbound', 'cosmos', 'start=', 'info', 'outbound', 'url=', 'search'))
    arg = " ".join([x for x in rest if x.strip()])
    func = None
    start = None
    for opt, val in opts: 
        _map = {'inbound': getCosmos, 
                'cosmos': getCosmos, 
                'info': getBlogInfo, 
                'outbound': getOutboundBlogs, 
                'search': search, 
                }
        if opt in ('-u', '--url'): 
            url = val
        elif opt in ('-s', '--start'): 
            start = int(val)
        elif opt in ('-d', '--debug'): 
            global DEBUG
            DEBUG = 1
        elif opt in ('-t', '--test'): 
            func = test
        elif opt.startswith('--') and _map.has_key(opt[2: ]): 
            assert func is None, "Only one function (url, inbound, info or outbound) may be supplied"
            func = _map[opt[2: ]]
    if func is None: 
        print "No function supplied; --url, --inbound, --info, --search or --outbound must be specified on the command line"
        return
    if start is not None: 
        r = func(arg, start)
    else: 
        r = func(arg)
    if func is not test: 
        pprint(r)

if __name__ == '__main__': 
    findkey()
    main()
    open('cache.txt', 'wt').write(`callcache`)

[Created with py2html Ver:0.62]

Valid HTML 4.01!