Beating Google With CouchDB, Celery and Whoosh (Part 3)
In this series I’ll show you how to build a search engine using standard Python tools like Django, Whoosh and CouchDB. In this post we’ll start crawling the web and filling our database with the contents of pages.
One of the rules we set down was to not request a page too often. If, by accident, we try to retrieve a page
more than once a week then don’t want that request to actually make it to the internet. To help prevent this
we’ll extend the Page class we created in the last post with a function called get_by_url.
This static method will take a url and return the Page object that represents it, retrieving the page if we
don’t already have a copy. You could create this as an independent function, but I prefer to use static
methods to keep things tidy.
We only actually want to retrieve the page from the internet in one of the three tasks the we’re going to
create so we’ll give get_by_url a parameter, update that enables us to return None
if we don’t have a copy of the page.
@staticmethod
def get_by_url(url, update=True):
    r = settings.db.view("page/by_url", key=url)
    if len(r.rows) == 1:
        doc = Page.load(settings.db, r.rows[0].value)
        if doc.is_valid():
            return doc
    elif not update:
        return None
    else:
        doc = Page(url=url)
        doc.update()
        return doc
The key line in the static method is doc.update(). This calls the function to retrieves the page and
makes sure we respect the robots.txt file. Let’s look at what happens in that function.
def update(self):
    parse = urlparse(self.url)
We need to split up the given URL so we know whether it’s a secure connection or not, and we need to limit our connects to each domain so we need get that as well. Python has a module, urlparse, that does the hard work for us.
    robotstxt = RobotsTxt.get_by_domain(parse.scheme, parse.netloc)
    if not robotstxt.is_allowed(parse.netloc):
        return False
In the previous post we discussed parsing the robots.txt file and here we make sure that if we’re not
allowed to index a page, then we don’t
    while cache.get(parse.netloc)
        is not None:
            time.sleep(1)
            cache.set(parse.netloc, True, 10)
As with the code to parse robots.txt files we need to make sure we don’t access the same domain too
often.
    req = Request(self.url, None, { "User-Agent": settings.USER_AGENT })
    resp = urlopen(req)
    if not resp.info()["Content-Type"].startswith("text/html"):
        return
    self.content = resp.read().decode("utf8")
    self.last_checked = datetime.now()
    self.store(settings.db)
Finally, once we’ve checked we’re allowed to access a page and haven’t accessed another page on the same
domain recently we use the standard Python tools to download the content of the page and store it in our
database.n Now we can retrieve a page we need to add it to the task processing system. To do this we’ll create
a Celery task to retrieve the page. The task just needs to call the
get_by_url static method we created earlier and then, if the page is downloaded trigger a second task
to parse out all of the links.
@task
def retrieve_page(url):
    page = Page.get_by_url(url)
    if page is None:
        return
    find_links.delay(page.id)
You might be asking why the links aren’t parsed immediately after retrieving the page. They certainly could
be, but a key goal was to enable the crawling process to scale as much as possible. Each page crawled has,
based on the pages I’ve crawled so far, around 100 links on it. As part of the find_links task a new
retrieve_task is created. This quickly swamps the tasks to perform other tasks like calculating the
rank of a page and prevents them from being processed.
Celery provides the tools to ensure that a subset of message are processed in a timely manner, called
Queues. Tasks can be assigned to different queues and daemons can be made to watch a specific set of
queues. If you have a Celery daemon that only watches the queue used by your high priority tasks then those
tasks will always be processed quickly.
We’ll use two queues, one for retrieving the pages and another for processing them. First we need to tell
Celery about the queues (we also need to include the default celery queue here) and then we create a
router class. The router looks at the task name and decides which queue to put it into. Your routing code
could be very complicated, but ours is very straightforward.
CELERY_QUEUES = {
    "retrieve": {
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "retrieve"
    },
    "process": {
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "process "
    },
    "celery": {
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "celery"
    }
}
class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task == "crawler.tasks.retrieve_page":
            return { "queue": "retrieve" }
        else:
            return { "queue": "process" }
CELERY_ROUTES = (MyRouter(), )
The final step is to allow the crawler to be kicked off by seeding it with some URLs. I’ve previously posted about how to create a Django management command and they’re a perfect fit here. The command takes one argument, the url, and creates a Celery task to retrieve it.
class Command(BaseCommand):
    def handle(self, url, **options):
        retrieve_page.delay(url)
We’ve now got a web crawler that is almost complete. In the next post I’ll discuss parsing links out of the HTML, and we’ll look at calculating the rank of each page.
Read part 4.
 
     
     
      
Comments
Hi Andrew,
I'm really enjoying this topic, I think you are witting about a lot of interesting things all together. I'm trying to follow your posts, but I've found a problem in this part that I didn't manage to solve due to the lack of information about CouchDB on Celery. When I launch celery:
python rdc_crawler/manage.py celeryd -l info
It fails, raising unauthorized error, you can see traceback at http://paste.org/pastebin/view/39372. As far as I know the problem comes to Celery not sending authorization credentials to CouchDB:
PUT /couchdb%2Fcelery HTTP/1.1
Host: my.host.com
Content-Length:0
Accept: application/json
User-Agent: CouchDB-Python/0.8
My settings are:
BROKER_TRANSPORT = "couchdb"
BROKER_HOST = "my.host.com"
BROKER_PORT = 80
BROKER_USER = "user"
BROKER_PASSWORD = "pass"
BROKER_VHOST = "couchdb/celery"
BROKER_USER_SSL = True
Any hint about how to properly set credentials?
PS: I'm running CouchDB under nginx proxy at my.host.com:80/couchdb and CouchDB with require_valid_user = true option
Rafael Durán Castañeda
08 Oct 2011
Hi Rafael,
Thanks for the comment but I'm afraid I can't help. I haven't used CouchDB authentication. I suggest you try posting to the CouchDB user list or StackOverflow.com.
Andrew Wilkinson
11 Oct 2011
Hi,
Thanks for your answer, but don worry I've got it working. Kombu (celery amqp framework) has a bug, so credentials wasn't used. I've fixed it and sent a pull request on GitHub, so this will be fixed on new Kombu versions, in the meantime I'm using my own patched version.
Bye
Rafael Durán Castañeda
11 Oct 2011
Hi Andrew,
I think there is a small mistake in robotstxt.is_allowed call, it should take an argument being a full url and not parse.netloc
Cheers, Thomas
Thomas Bikeev (@thomas_bikeev)
18 Nov 2011
Thanks Thomas, you're quite right. I've updated the post and the code on github with your fix.
Andrew Wilkinson
25 Nov 2011