Nutch 1.0源代码分析

更新时间:2024-03-30 00:39:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

Nutch 1.0源代码分析(1): Crawl初始化与Injector

文章分类:互联网

从今天开始陆续对Nutch 1.0的工作过程进行分析。从Crawl为起点,先分析爬虫从爬行到索引的过程,再分析查询检索的过程。如有错误,欢迎批评指正!

Crawl类是Nutch爬虫中的一个核心类,它的主要方法就是该类中的main方法,该方法包含了爬虫的整个运行阶段,包括Inject(将初始URL加入到网页库CrawlDb中),Generate(产生待爬行队列),Fetch(抓取网页)和Index(索引)。这里通过分析main方法的执行过程分析nutch爬虫从建立网页库到建立索引的全过程。Crawl类位于org.apache.nutch.crawl包中,接下来就对main方法进行分析。 1.初始化阶段

根据nutch-default.xml,nutch-site.xml,nutch-tool.xml 3个配置文件和用户在命令行输入的参数,配置和初始化爬行深度、线程数、爬行根目录、topN、crawlDb、linkDb、segments、indexes等目录,并初始化injector、generator、fetcher、parser、indexer等。 2.Inject

Inject阶段将初始URL进行合法性检查(UrlNomalizer)如:统一大小写,检查是否为合法URL等和过滤(UrlFilter),如过滤掉用户指定的不爬行的链接之后注入到网页数据库crawlDb中。调用inject(crawlDb, rootUrlDir)方法,该方法在org.apache.nutch.crawl.Injector类中,下面对其进行分析。 inject方法的两个参数,Path crawlDb代表crawlDb的路径,这里是crawl/crawlDb;Path rootUrlDir 代表初始URL的路径。

Java代码

1. // org.apache.nutch.crawl.Injector

2. public void inject(Path crawlDb, Path urlDir) throws IOException {

3. // crawlDb 网页数据库目

录 // urlDir 种子URL目录 4. ... ...

5. Path tempDir =

6. new Path(getConf().get(\ \eger.MAX_VALUE))); 7.

8. // map text input file to a file 9. if (LOG.isInfoEnabled()) {

10. LOG.info(\entries.\

11. }

12. JobConf sortJob = new NutchJob(getConf()); 13. sortJob.setJobName(\

14. FileInputFormat.addInputPath(sortJob, urlDir); 15. sortJob.setMapperClass(InjectMapper.class); 16.

17. FileOutputFormat.setOutputPath(sortJob, tempDir);

18. sortJob.setOutputFormat(SequenceFileOutputFormat.class); 19. sortJob.setOutputKeyClass(Text.class);

20. sortJob.setOutputValueClass(CrawlDatum.class);

21. sortJob.setLong(\illis());

22. JobClient.runJob(sortJob); 23. ... ...

程序的前半部分将输入文件map成的文件。输入文件是用户指定的初始URL,使用的Mapper类是InjectMapper,Reduce阶段不做任何事情,原样输出。输出的目录是刚刚生成的tempDir临时目录,输出的键值对类型是,将injector.current.time属性设置为当前时间。 这里介绍一下CrawlDatum的作用。它是记录数据库crawlDb中每个链接爬行状态的一个类,每次爬行的时候判断这个链接是否爬行过,目前正处于爬行的哪个阶段等等信息都记录在这个类的标志中。

那么在Map阶段具体都做了哪些事呢?需要进一步分析InjectMapper类。该类的configure方法对urlNormalizer、interval、scfiliter、scoreInjected等域进行初始化,为后面的map做准备。其中interval是网页抓取的时间间隔,默认情况下是30天,scfilter是当前nutch使用的ScoringFilter,scoreInjected是inject到crawlDb中的链接得分,默认是1.0。接下来是最重要的map方法: Java代码

1. public void map(WritableComparable key, Text value,

2. OutputCollector output, Reporter reporter)

3. throws IOException {

4. String url = value.toString(); // value is line of text 5. try {

6. url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);

7. url = filters.filter(url); // filter the url

8. } catch (Exception e) {

9. if (LOG.isWarnEnabled()) { LOG.warn(\\

10. url = null; 11. }

12. if (url != null) { // if it passes

13. value.set(url); // collect it

14. CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);

15. datum.setFetchTime(curTime); 16. datum.setScore(scoreInjected); 17. try {

18. scfilters.injectedScore(value, datum); 19. } catch (ScoringFilterException e) { 20. if (LOG.isWarnEnabled()) {

21. LOG.warn(%url +

22. \);

23. }

24. datum.setScore(scoreInjected); 25. }

26. output.collect(value, datum); // 一个URL对应一个状态信息,key是URL,值是状态信息 27. } 28. }

这个map方法的主要功能是将用户输入的初始URL从文本转换为键值对类型,CrawlDatum是一个链接的爬行状态信息。首先使用nomalizer对输入的URL进行规范化,如果不能通过规范化则返回,否则向crawldatum中打入链接的相关信息:当前的爬行状态、链接得分(如果

ScoringFilter中指定了在inject阶段怎样计算注入链接的得分则使用这个得分,否则使用默认的1.0)、抓取时间。最后,收集url和与它对应的爬行状态。 程序返回到inject方法中继续执行: Java代码

1. // merge with existing crawl db 2. if (LOG.isInfoEnabled()) {

3. LOG.info(\); 4. }

5. JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);

6. FileInputFormat.addInputPath(mergeJob, tempDir); 7. mergeJob.setReducerClass(InjectReducer.class); 8. JobClient.runJob(mergeJob);

9. CrawlDb.install(mergeJob, crawlDb); 10.

11. // clean up

12. FileSystem fs = FileSystem.get(getConf()); 13. fs.delete(tempDir, true);

14. if (LOG.isInfoEnabled()) { LOG.info(\15.

16. } // inject方法结束

这部分首先调用CrawlDb的createJob静态方法对接下来的将injected URL合并到crawlDb的MapReduce任务进行配置。createJob将FileInputPath设置为crawl/crawlDb/current,Mapper类设置为CrawlDbFilter,Reducer类设置为CrawlDbReducer,输出路径(outputPath)设置为crawl/crawlDb/下的一个以随机数命名的目录,输出的键值对类型是,此后creatJob返回。

在该方法后,inject方法继续对接下来的MapReduce任务进行配置,增加一个输入路径tempDir(即上次Map任务的输出。这样,Map任务实际有两个输入路径),设置Reducer类为InjectReducer(此处待确认:覆盖creatJob中设置的Reducer?????)。

Map阶段CrawlDbFilter的作用是将crawl/crawldb/current和上一步Map处理(读入初始URL并map)后的键值对经过UrlNormalizer和UrlFilter规范化(如字母大小写等)和过滤(根据用户配置,决定是否保留这个url)后,然后输出到tempDir中。

Reduce阶段InjectReducer的任务是将已存在于current中的网页数据库和新加入在outputPath中的数据库进行合并。如果新加入的网页已存在于current中,则不修改其状态,否则将新加入链接datum的状态从

STATUS_INJECTED改为STATUS_DB_UNFETCHED。这样就完成了新旧数据库的合并 最后,通过CrawlDb.install(mergeJob, crawlDb)方法将tempDir删除。将新的数据库命名为current。

至此,Inject阶段的任务完成,该阶段数据处理的流程图如下:

Nutch 1.0源代码分析(2): Generate

文章分类:互联网 在Inject过后,程序返回到crawl的main方法中继续执行,接下来进入到一个循环,循环的终止条件是达到指定的爬行深度。在循环中依次进行generate和fetch两个操作,每次循环产生一个segment,位于crawl/segments下,以generate方法调用的时间作为这次循环产生目录的名称。首先分析Generate操作。

Generator类位于org.apache.nutch.crawl中,其中的核心方法是public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime)方法,作用是根据链接得分,产生包含topN个链接的待爬行队列fethclist,并更新网页数据库。crawl正是调用了这个方法进入Generate阶段。其中各个参数的含义是:

Path dbDir crawldb的路径 Path segments segments的路径 int numLists Reduce任务的数量

long topN 用户指定的TopN数量,即每轮爬行选择前TopN个URL加入到fetchlist中

long curTime 调用generate方法的时间

上述方法代码如下: Java代码

1. public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime) throws IOException { 2.

3. JobConf job = new NutchJob(getConf());

4. boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);

5. return generate(dbDir, segments, numLists, topN, curTime, filter, false); 6. }

它通过配置文件读取是否要进行URL过滤,默认为过滤。然后调用

generate(dbDir, segments, numLists, topN, curTime, filter, false)方法,这个方法是真正进行generate的地方,它产生了一个位于系统临时目录下的目录,命名为tempDir。 Java代码

1. public Path generate(Path dbDir, Path segments,

2. int numLists, long topN, long curTime, boolean filter,

3. boolean force) 4. throws IOException { 5. ... ...

6. JobConf job = new NutchJob(getConf());

7. job.setJobName(\8.

9. if (numLists == -1) { // for politeness make

10. numLists = job.getNumMapTasks(); // a partition per fetch task

11. // 获取Reduce任务的数量,默认是1 12. }

13. if (\ts != 1) { // 如果是local模式,则将reduce的数量置1 14. // override

15. LOG.info(\actly one partition.\16. numLists = 1; 17. }

18. ... ...

19. FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // crawl/crawldb/current

20. job.setInputFormat(SequenceFileInputFormat.class); 21.

22. job.setMapperClass(Selector.class);

23. job.setPartitionerClass(Selector.class); 24. job.setReducerClass(Selector.class); 25.

26. FileOutputFormat.setOutputPath(job, tempDir);

27. job.setOutputFormat(SequenceFileOutputFormat.class); 28. job.setOutputKeyClass(FloatWritable.class);

29. job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);

30. job.setOutputValueClass(SelectorEntry.class); 31. try {

32. JobClient.runJob(job); 33. } catch (IOException e) {

34. LockUtil.removeLockFile(fs, lock); 35. throw e; 36.}

程序首先对几个路径进行配置,然后获取Reducer的数量,如果hadoop当

前的模式为local,则将Reducer的数量numLists设为1。 3.1. 第1次MapReduce

程序接下来将对第1个MapReduce任务进行配置。输入路径InputPath为当前的网页数据库(crawl/crawldb/current),输出路径为临时目录tempDir。Mapper、Partitioner、Reducer类都是Selector类。 首先分析Map任务: Java代码

1. public void map(Text key, CrawlDatum value, OutputCollector output, Reporter reporter)

2. throws IOException { 3. Text url = key; 4. if (filter) {

5. // If filtering is on don't generate URLs that don't pass URLFilters 6. try {

7. if (filters.filter(url.toString()) == null) // 过滤URL

8. return;

9. } catch (URLFilterException e) { 10. if (LOG.isWarnEnabled()) {

11. LOG.warn(\etMessage()

12. + \13. } 14. } 15. }

16. CrawlDatum crawlDatum = value; 17.

18. // check fetch schedule

19. if (!schedule.shouldFetch(url, crawlDatum, curTime)) { // 默认情况下调用DefaultFetchSchedule继承的shouldFetch方法, 20. // 根据crawlDatum的时间和当前时间相比,如果比当前时间更新,则

21. // 可以考虑加入fetch list

22. // \当前时间\可以进行配置,不一定就是实际中的当前时间

23. LOG.debug(\e=\24. return; 25. } 26.

27. LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);

28. if (oldGenTime != null) { // awaiting fetch & update 29. if (oldGenTime.get() + genDelay > curTime) // still wait for update

30. return; 31. }

32. float sort = 1.0f; 33. try {

34. sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort); // 根据当前使用的ScoringFilter计算该URL在Generate阶段的得分

35. } catch (ScoringFilterException sfe) { 36. if (LOG.isWarnEnabled()) {

37. LOG.warn(\key + \38. } 39. }

40. // sort by decreasing score, using DecreasingFloatComparator

41. sortValue.set(sort);

42. // record generation time

43. crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);

44. entry.datum = crawlDatum; 45. entry.url = (Text)key;

46. output.collect(sortValue, entry); // invert for sort by score 47. }

map首先对输入的URL进行过滤,如果被过滤掉则返回,否则继续执行。接下来,调用shouldFetch方法通过对比当前时间和数据库中该链接的

fetchtime属性的间隔来判断是否考虑抓取该链接,如果不到应该的时间间隔则不怕,但是如果到了,也不一定就抓取,还要看后面的处理。接下来通过调用ScoringFilter的generatorSortValue方法判断在Generate阶段的得分。并将这个得分作为键收集将datum和url封装进SelectorEntry类型的entry对象中,并将entry作为值收集。第1次Map后的结果就是链接得分和它的信息对应的键值对。

接下来分析第一次MapReduce任务的Partitioner: Java代码

1. public int getPartition(FloatWritable key, Writable value, 2. int numReduceTasks) {

3. return hostPartitioner.getPartition(((SelectorEntry)value).url, key,

4. numReduceTasks); 5. }

它根据url代表的主机名进行分区,将同一个主机上的URL交给同一个Reducer处理,这样体现了对该站点的礼貌性(politeness)。第一次MapReduce的Reducer如下: Java代码

1. public void reduce(FloatWritable key, Iterator values, // key是每个链接的得分,values是SelectorEntry类型的得分对应的所有链接

2. OutputCollector output,

3. Reporter reporter) 4. throws IOException { 5.

6. while (values.hasNext() && count < limit) { 7. SelectorEntry entry = values.next(); 8. Text url = entry.url;

9. String urlString = url.toString(); 10. URL u = null; 11.

12. // skip bad urls, including empty and null urls 13. try {

14. u = new URL(url.toString());

15. } catch (MalformedURLException e) {

16. LOG.info(\

17. continue; 18. } 19.

20. String host = u.getHost(); 21. host = host.toLowerCase(); 22. String hostname = host; 23. ... ... 24. try {

25. urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);

26. host = new URL(urlString).getHost(); 27. } catch (Exception e) {

28. LOG.warn(\g (\

29. StringUtils.stringifyException(e) + \30. continue; 31. } 32.

33. // only filter if we are counting hosts 34.// 对同一个host,产生url的最大个数,通过配置文件获得,如果为-1,则没有限// 制

35. if (maxPerHost > 0) { 36.

37. IntWritable hostCount = hostCounts.get(host); 38. if (hostCount == null) {

39. hostCount = new IntWritable(); 40. hostCounts.put(host, hostCount); 41. }

42. // increment hostCount

43. hostCount.set(hostCount.get() + 1); 44. // skip URL if above the limit per host. 45. if (hostCount.get() > maxPerHost) {

46. if (hostCount.get() == maxPerHost + 1) {

47. // remember the raw hostname that is maxed out 48. maxedHosts.add(hostname); 49. if (LOG.isInfoEnabled()) {

50. LOG.info(\axPerHost + \51. } 52. }

53. continue; 54. } 55. }

56. output.collect(key, entry);

57. // Count is incremented only when we keep the URL 58. // maxPerHost may cause us to skip it. 59. count++; 60. } 61. }

limit是每个Reducer最大需要处理的链接的数量,由limit =

job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks()得到,也就是说,是由topN/Reducer的数量决定的。只要每个Reducer没有达到这个最大限度,就从输入的value中取出链接。如果用户配置了

generate.max.per.host属性并设置为正值,则会限制同一个host中产生的链

接数。程序会过滤掉超过指定数目的属于同一主机的URL。reduce方法最终收集通过过滤,且符合数量要求的键值对。

由于每次收集的键值对的数量是受limit限制的,而且reduce输入的value又是根据链接的得分值从高到低排序的,所以当达到limit的限制时,低分的url就被略掉了。

第1次MapReduce任务的输出还是以<得分,链接信息>的形式保存的。 3.2. 第2次MapReduce

在进行第1次MapReduce后,程序返回generate方法中,并进行第2次MapReduce: Java代码

1. ... ...

2. job = new NutchJob(getConf());

3. job.setJobName(\4.

5. job.setInt(\nt()); 6.

7. FileInputFormat.addInputPath(job, tempDir);

8. job.setInputFormat(SequenceFileInputFormat.class); 9.

10. job.setMapperClass(SelectorInverseMapper.class); 11. job.setMapOutputKeyClass(Text.class);

12. job.setMapOutputValueClass(SelectorEntry.class); 13. job.setPartitionerClass(PartitionUrlByHost.class); 14. job.setReducerClass(PartitionReducer.class); 15. job.setNumReduceTasks(numLists); 16.

17.// output是crawl/segments/yyyyMMddHHmmss/crawl_generate 18. FileOutputFormat.setOutputPath(job, output); job.setOutputFormat(SequenceFileOutputFormat.class); 19. job.setOutputKeyClass(Text.class);

20. job.setOutputValueClass(CrawlDatum.class);

21. job.setOutputKeyComparatorClass(HashComparator.class);

此次MapReduce的输入是第一次MapReduce输出到的临时文件,输出路径是segment/目录下的crawl_generate。

下面来分析Mapper的任务,Mapper类是SelectorInverseMapper: Java代码

1. public static class SelectorInverseMapper extends MapReduceBase implements Mapper { 2.

3. public void map(FloatWritable key, SelectorEntry value, OutputCollector output, Reporter reporter) throws IOException {

4. SelectorEntry entry = (SelectorEntry)value; 5. output.collect(entry.url, entry); 6. } 7. }

map方法的作用是将第一次MapReduce输出的以<链接得分,链接信息>为形式的文件转变为形式的文件,以URL作为键值,以便Reducer进行处理。Partitioner以url对应主机名的哈希值作为分发到Reducer的依据,接下来再看看Reducer中的内容,Reducer采用PartitionReducer类: Java代码

1. public static class PartitionReducer extends MapReduceBase 2. implements Reducer { 3.

4. public void reduce(Text key, Iterator values,

5. OutputCollector output, Reporter reporter) throws IOException {

6. while (values.hasNext()) {

7. SelectorEntry entry = values.next(); 8. output.collect(entry.url, entry.datum); 9. } 10. } 11.}

Reducer的作用也十分简单,就是将Mapper的输出进一步转化,形成crawlDb中数据的保存形式。之所以将

selectorEntry>到的转换分两步进行而不全都放在Map阶段是因为转化需要将同一个主机上的链接交给同一个Reducer进行处理。 crawl_generate中包含的链接就是待爬行队列fetch list。 3.3. 第3次MapReduce

在第二次MapReduce执行后,紧接着进行第三次MapReduce任务,这次的目的是更新tempDir中的generate时间信息,并输出到tempDir2中: Java代码

1. Path tempDir2 =

2. new Path(getConf().get(\3. \;

4.

5. job = new NutchJob(getConf());

6. job.setJobName(\

7. job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); 8. FileInputFormat.addInputPath(job, tempDir);

9. FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // 两个InputPath

10. job.setInputFormat(SequenceFileInputFormat.class); 11. job.setMapperClass(CrawlDbUpdater.class); 12. job.setReducerClass(CrawlDbUpdater.class);

13. job.setOutputFormat(MapFileOutputFormat.class); 14. job.setOutputKeyClass(Text.class);

15. job.setOutputValueClass(CrawlDatum.class); 16. FileOutputFormat.setOutputPath(job, tempDir2);

此次MapReduce以第一次MapReduce的输出tmpDir(里面存放着形式的信息)和当前的网页数据库crawl/crawldb/current两个路径为输入,Mapper和Reducer都在CrawlDbUpdater中。 map方法如下: Java代码

1. public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {

2. if (key instanceof FloatWritable) { // tempDir source key是得分,value是该得分对应的所有selector entry 3. SelectorEntry se = (SelectorEntry)value; 4. output.collect(se.url, se.datum); 5. } else {

6. output.collect((Text)key, (CrawlDatum)value); // key是URL,value是datum 7. } 8. }

它将两个输入路径中的不同数据形式统一为的形式。 reduce方法如下: Java代码

1. public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {

2. while (values.hasNext()) {

3. CrawlDatum val = values.next();

4. if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {

5. LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); 6. genTime.set(gt.get());

7. if (genTime.get() != generateTime) { 8. orig.set(val); 9. genTime.set(0L); 10. continue; 11. }

12. } else {

13. orig.set(val); 14. } 15. }

16. if (genTime.get() != 0L) {

17. orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); 18. }

19. output.collect(key, orig); 20. }

Reduce任务的功能是将输入的中的datum的generate时间进行更新后,输出,此时的输出目录tempDir2就是最新的网页数据库。

在第三次MapReduce执行后,generate方法将tempDir2命名为当前的网页数据库,删除旧的crawlDb,返回segment对象。Generate阶段执行完毕。 这一阶段的数据流程图如下:

Nutch 1.0源代码分析(3): Fetch -- 1

文章分类:互联网

今天分析Fetch的过程。

Generate阶段产生了待爬行队列,并保存在segment/crawl_generate目录下。在Generate结束后,进入Fethch阶段,该阶段同Generate一样处于循环中,只要未达到指定的爬行深度,就继续。这一阶段是实际抓取网页的过程,它采用1个生产者(QueueFeeder)-多个消费者(FetchThread)的模型。QueueFeeder从输入中读取fetchlist并产生FetchItemQueue队列,该队列中包含了描述将要爬行的项目的FetchItem。这个队列是以要爬行的主机划分的,每个主机对应一个队列。但是在同一时刻,爬行FetchItem的数量受并发线程数的限制。

FetchThread从队列中不断地取出FetchItem,QueueFeeder向其中添加,这样就保持了一个动态平衡,直到FetchItemQueue中的FetchItem都被消耗完了。当队列中FetchItem的数量达到0,爬行结束。

爬行过程中,爬虫会考虑到对待爬行主机的礼貌(politeness),每个主机的爬行礼貌设置可以不同,比如最大并发请求和爬行间隔等。每个

FetchItemQueue对象还维护着一个名为inProgress的队列,用于保存正在抓取的FetchItem,当FetchItemQueue返回一个FetchItem后,便从这个队列中移出并放入inProgress队列中。FetchItemQueue还保存FetchItem最后向主机请求的时间。

当FetchThread从FetchItemQueue中请求新的将要抓取的项,队列返回一个项给FetchItem,或者如果由于礼貌性的原因项没有准备好,则返回null。如果队列中还有待爬行项,且所有的项都没有准备好,那么FetchThread就等待,直到有项目准备好或到达timeout时间。

Fetch阶段以org.apache.nutch.crawl.Fetcher类中的fetch(Path

segment, int threads, boolean parsing)方法作为入口,该方法的参数如下: Path segment segment的路径,这里是segment/crawl_generate int threads 用户指定的线程数 Boolean 是否进行解析

fetch方法中主要是对一个MapReduce任务进行配置,并运行这个任务。这个任务中只有Map而没有Reduce。输入路径是segment/crawl_generate,这就是在Generate阶段产生的待爬行队列。输入格式是InputFormat,它Fetcher中实现,继承了SequenceFileInputFormat但是特殊指出在于不将文件进行分片,而是整个文件作为一个分片送给Mapper。输出格式是FetcherOutputFormat,它是定义在org.apache.nutch.fetcher中,有三种不同的输出,分别根据value属于:(1)CrawlDatum,(2)Content还是(3)Parse的实例输出到不同的目录中。在最后,runJob(job)调用Fetcher类中的run方法启动任务: Java代码

1. // Fetcher中的run方法

2. public void run(RecordReader input,

3. OutputCollector output, Reporter reporter)

4. throws IOException { 5. this.output = output; 6. this.reporter = reporter;

7. this.fetchQueues = new FetchItemQueues(getConf()); 8. int threadCount = getConf().getInt(\ch\10);

9. ... ...

10. feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);

11. // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);

12. feeder.start(); // 调用QueueFeeder的run方法

4.1 QueueFeeder

方法的前半部分初始化了一个QueueFeeder,QueueFeeder维护着一个FetchItemQueues类型的名为queues的队列,这个队列是主机名和它对应

FetchItemQueue的映射,所有相同主机名的抓取项(FetchItem)放入相同主机名对应的FetchItemQueue中。

接着,feeder调用start方法启动QueueFeeder的线程: Java代码

1. // QueueFeeder中的run方法 2. public void run() {

3. boolean hasMore = true; // reader中是否还有数据

4. int cnt = 0;

5. while (hasMore) {

6. int feed = size - queues.getTotalSize(); // 队列还有多少剩余空间

7. if (feed <= 0) {

8. // queues are full - spin-wait until they have some free

9. // space

10. // 队列满,等待 11. try {

12. Thread.sleep(1000); 13. } catch (Exception e) { 14. } 15. ;

16. continue;

17. } else { // 队列有剩余空间,向其中添加FetchItem

18. LOG.debug(\ ...\

19. while (feed > 0 && hasMore) { 20. try {

21. Text url = new Text();

22. CrawlDatum datum = new CrawlDatum();

23. hasMore = reader.next(url, datum); // next方法,如果成功读取了下一个,返回true,如果读到末尾,返回false

24. if (hasMore) {

25. queues.addFetchItem(url, datum);

26. cnt++; 27. feed--; 28. }

29. } catch (IOException e) { 30. ... ... 31. return; 32. } 33. ...... 34. }

该线程的工作比较简单,就是检查队列是否未满,如果未满且输入中还有数据,则将数据添加到FetchItemQueue中。添加到队列的过程分两部进行,先添加到FetchItemQueues,再添加到FetchItemQueue中。 这里在说明一下根据主机进行映射的概念。如果传入fetch方法的parsing参数是true,则以ip + protocol作为queueId存入队列映射,否则以hostname + protocol作为queueId作为queueId存入映射。上面代码中的getTotalSize方法返回totalSize,这个变量是当前FetchItemQueue中队列的FetchItem的个数。

FetchItemQueues和FetchItemQueue中包含的内容和对应关系如下:

QueueFeeder的run对应的流程图如下:

Nutch 1.0源代码分析(3): Fetch -- 2

文章分类:互联网 4.2 FetcherThread

在启动了QueueFeeder的线程后,Fetcher类run方法的主线程继续执行: Java代码

1. for (int i = 0; i < threadCount; i++) { // spawn threads 2. new FetcherThread(getConf()).start(); 3. }

4. ......

它根据用户指定的爬行线程数,开启了threadCount个爬行线程抓取网页。其后,当当前活动线程数>0时,就报告线程的状态。直到当前线程数为0时,run方法的主线程也退出。活跃线程数activeThreads是当前活跃的FetchThread的线程的个数。

接下来就分析用于抓取网页的类FetcherThread,该类同样是Fetcher类的一个内部类,继承了Thread,继而可以使用多线程。Fetcher类的run方法中启动了FetchThread的线程,调用其中的run方法,下面对其进行分析。 Java代码

1. public void run() {

2. activeThreads.incrementAndGet(); // count threads

3. FetchItem fit = null; 4. try {

5. while (true) {

6. fit = fetchQueues.getFetchItem(); // 使用Iterator从queues中取出第一个FetchItemQueue,再从queue中取出第一个FetchItem

7. if (fit == null) {

8. if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { // queue没有准备好返回fetchitem,等待 9. LOG.debug(getName() + \g ...\

10. // spin-wait.

11. spinWaiting.incrementAndGet(); 12. try {

13. Thread.sleep(500); // 阻塞500ms后重新调度

14. } catch (Exception e) { 15. }

16. spinWaiting.decrementAndGet();

17. continue;

18. } else { // 该线程的所有工作都已完成,退出

19. // all done, finish this thread 20. return; 21. } 22. }

进入run方法后,首先为当前活跃线程数activeThreads加1,接下来进入一个死循环,进行网页的抓取。它首先调用getFetchItem方法,如果当前inProgress队列的大小>=每台主机允许的并发爬行线程数,则返回null,否则从所有FetchItemQueue中取出一个已经准备好(已经超过爬行间隔时间)的FetchItem,并加入到inProgress队列中。如果该值为空,则有两种可能,一种是出于礼貌性,队列不能返回FetchItem,则线程阻塞500毫秒后继续请求。另外一种可能是当前线程要的所有工作都已完成,则线程退出。 注意,同一主机对应的FetchItemQueue拥有相同的爬行间隔时间时间(得到爬行间隔时间的方法见后面的分析),在该queue中的一个FetchItem被爬行后,会设置这个队列的下一次请求时间,只有超过这个时间的队列才能返回新的FetchItem。这样便实现了对同一个主机的礼貌性访问控制。 接下来又进入一个循环,循环的终止条件是达到最大重定向次数或者不允许重定向。 Java代码

1. do {

2. redirecting = false;

3. Protocol protocol = this.protocolFactory .getProtocol(fit.url.toString());

4. RobotRules rules = protocol.getRobotRules(fit.url, 5. fit.datum); // 获取爬行规则

6. // rules实际上是RobotRuleSet类的实例,该类实现了RobotRules接口

7. if (!rules.isAllowed(fit.u)) { // unblock 8. fetchQueues.finishFetchItem(fit, true); 9. if (LOG.isDebugEnabled()) {

10. LOG.debug(\

11. }

12. output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED,

13. CrawlDatum.STATUS_FETCH_GONE); 14. continue;

15. }

16. if (rules.getCrawlDelay() > 0) {

17. if (rules.getCrawlDelay() > maxCrawlDelay) {// unblock

18. fetchQueues.finishFetchItem(fit, true);

19. output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);

20. continue; 21. } else {

22. FetchItemQueue fiq = fetchQueues .getFetchItemQueue(fit.queueID);

23. fiq.crawlDelay = rules.getCrawlDelay(); 24. } 25. }

首先根据要爬行的url确定其协议类型并初始化一个对应的Protocol对象,以下以http协议举例,则protocol初始化为org.apache.nutch.protocol包中的http类,其基类org.apache.nutch.protocol.http.api包中的HttpBase类。接着,生成一个RobotRule类型的对象,得到该url对应的站点的robot.txt文件,该文件中规定了爬虫的爬行限制。

如果不允许爬行,则调用finishFetchItem方法从inProgress集合中删除该fit。并设置该url对应datum的status为STATUS_FETCH_GONE,设置下一次爬行时间nextFetchTime。收集。 如果允许,则获取robot.txt中的规定的爬行间隔,如果大于配置文件中规定的最大爬行间隔,则跳过这个网页,调用finishFetchItem方法将这个FetchItem从inProgress队列中移出并报告一个错误,否则将这个FetchItem所在的主机队列FetchItemQueue的爬行间隔设为robot.txt中规定的值。 run方法继续执行: Java代码

1. ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);

2. ProtocolStatus status = output.getStatus(); 3. Content content = output.getContent(); 4. ParseStatus pstatus = null; 5. fetchQueues.finishFetchItem(fit);

protocol对象调用HttpBase类中的getProtocolOutput方法,该方法再调用Http类中的getResponse方法使用socket获取所请求文件的内容。返回一个ProtocolOutput类的对象。这个类中包含了对应协议请求的输出,包括

Content类的内容和ProtocolStatus类的协议状态。然后调用finishFetchItem方法将fit从inProgress队列中移出,解除对该主机队列的阻塞。 接着,通过协议状态码来进行下一步的工作。协议状态码是前面进行http请求获得响应后根据响应的内容填写的,包括WOULDBLOCK,SUCCESS等状态,每种状态处理的方式不同,以下介绍请求后成功获得一个网页的response之后的处理过程 Java代码

1. switch (status.getCode()) {

2. case ProtocolStatus.WOULDBLOCK: 3. ......

4. case ProtocolStatus.SUCCESS: // got a page

5. pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); updateStatus(content.getContent().length); // 增加网页数量和字节数

6. if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == parseStatus.SUCCESS_REDIRECT) {

7. String newUrl = pstatus.getMessage(); 8. int refreshTime = Integer.valueOf(pstatus

9. .getArgs()[1]); 10. Text redirUrl = handleRedirect( fit.url,fit.datum,

11. urlString, newUrl,

12. refreshTime < Fetcher.PERM_REFRESH_TIME,Fetcher.CONTENT_REDIR); 13. if (redirUrl != null) {

14. CrawlDatum newDatum = new CrawlDatum( CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(),

15. fit.datum.getScore());

16. if (reprUrl != null) {

17. newDatum.getMetaData().put( Nutch.WRITABLE_REPR_URL_KEY,

18. new Text(reprUrl)); }

19. fit = FetchItem.create(redirUrl,

20. newDatum, byIP);

21. if (fit != null) {

22. FetchItemQueue fiq = fetchQueues .getFetchItemQueue(fit.queueID); fiq.addInProgressFetchItem(fit);

23. } else { // stop redirecting 24. redirecting = false; 25. } 26. } 27. }

28. break; 29. case...:

首先,调用output方法,该方法做了以下几个工作:(1)向链接对应的datum写入STATUS_FETCH_SUCCESS状态和当前时间作为抓取时间。向content对应的metadata中写入segment_name_key,score_key,fetch_status_key各自对应的值,其中score_key对应的得分由ScoringFilter算得。(2) 如果配置了parse则进行解析,解析的结果返回为ParseResult类的对象。(3)收集,其中content是未解析的内容以及收集。(4) 从content的metadata中拷贝得分,并存入parseData的metadata中。

在output方法返回后,调用updateStatus增加当前爬行的网页数和字节数。如果经过解析,网页中有重定向链接,则将重定向的链接加入到FetchItemQueue中。

在当重定向次数大于规定次数或者不允许重定向时,退出循环。 最后,将当前活跃进程数减1。网页抓取线程结束。

Nutch 1.0源代码分析(3): Fetch -- 3

文章分类:互联网 4.3 小结

最后我们从更高的一个视角来回顾爬行过程中的一些问题。 4.3.1 线程模型

Fetcher采用了经典的生产者消费者模型,生产者是QueueFeeder,用于根据主机/协议对向FetchItemQueues中不断地添加待爬行的FetchItem,只要FetchItemQueues队列中还有空间并且有输入数据,就添加,否则阻塞。消费者自然是FetchThread,它负责抓取网页。爬虫总共的线程数由配置决定,默认值是10,还可以指定对于同一个主机队列最大的爬行线程数,它们之和不超过总共的线程数。每个主机队列的最大线程数决定了每个主机队列中inProgress队列的大小,出于礼貌性一般对于同一个主机,只采用一个线程爬行。当待爬行的全部网页被爬行完后,爬行线程退出。 4.3.2 Mapper的分析

在了结了Fetch阶段的整个流程后,我们不禁要问,在分布式的情况下,不同的task node是怎样实现避免爬行相同网站的呢? 这要从Generate阶段说起,回顾Generate的第2次MapReduce,在Mapper输出的时候是以相同主机名作为分发到Reducer的依据的,因此各个Reducer生成了若干个文件,每个文件对应自己处理的那个主机对应的。在Fetcher的输入时,又不对文件进行分片,因此分到各个Mapper上的主机就不会重复,避免了重复爬行的情况。 4.3.3 Fetch阶段流程图

Nutch 1.0源代码分析(4): Parse

文章分类:互联网

Parse阶段和Fetch同属于一个循环层次中,在Fetch后由Parse阶段对抓取的内容进行解析。该阶段的入口是: Java代码

1. public void parse(Path segment) throws IOException

方法。其中就是对即将启动的Parse阶段的MapReduce任务进行配置,主要的配置如下: Java代码

1. FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));

2. job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); 3. job.setInputFormat(SequenceFileInputFormat.class); 4. job.setMapperClass(ParseSegment.class);

5. job.setReducerClass(ParseSegment.class); 6.

7. FileOutputFormat.setOutputPath(job, segment); 8. job.setOutputFormat(ParseOutputFormat.class); 9. job.setOutputKeyClass(Text.class);

10. job.setOutputValueClass(ParseImpl.class);

其中输入路径就是Fetch阶段保存下来的segment/content目录,其中键值对的类型是。Mapper和Reducer都在ParseSegment类中,输出的键值对类型是。输出格式是由ParseOutputFormat类实现的,它也是进行解析的重要部分。

map方法

在进行配置之后就启动任务,首先分析map方法: Java代码

1. public void map(WritableComparable key, Content content, 2. OutputCollector output, Reporter reporter)

3. throws IOException {

4. // convert on the fly from old UTF8 keys 5. if (key instanceof UTF8) { 6. newKey.set(key.toString()); 7. key = newKey;

8. } 9.

10. int status =

11. Integer.parseInt(content.getMetadata().get(Nutch.FETCH_STATUS_KEY)); // 获取fetch阶段的状态

12. if (status != CrawlDatum.STATUS_FETCH_SUCCESS) { 13. // content not fetched successfully, skip document 14. LOG.debug(\ successfully\15. return; 16. }

17. ParseResult parseResult = null; 18. try {

19. parseResult = new ParseUtil(getConf()).parse(content);

20. } catch (Exception e) {

21. LOG.warn(\ingifyException(e)); 22. return; 23. }

map首先获取Fetch阶段的状态,如果没有抓取成功,则返回,跳过这个网页,如果成功则调用ParseUtil的parse方法,对页面进行解析,该方法实现如下:

Java代码

1. parsers = this.parserFactory.getParsers(content.getContentType(),

2. content.getUrl() != null ? content.getUrl():\

首先根据网页的类型找出可以解析这个网页的全部parser,再依次调用每个parser的getParse方法对网页进行解析,只要有一个parser成功解析了网页就返回给map方法的parseResult。对于HTML来说,使用HtmlParser作为其解析器。 Java代码

1. public ParseResult getParse(Content content) { // 解析content

2. HTMLMetaTags metaTags = new HTMLMetaTags(); 3. ......

4. DocumentFragment root; 5. try {

6. byte[] contentInOctets = content.getContent();

7. InputSource input = new InputSource(new ByteArrayInputStream(contentInOctets)); 8. ......

9. input.setEncoding(encoding);

10. if (LOG.isTraceEnabled()) { LOG.trace(\11. root = parse(input); 12.}

13.......

14.HTMLMetaProcessor.getMetaTags(metaTags, root, base); // 获取网页的meta标签中的属性

15. if (LOG.isTraceEnabled()) {

16. LOG.trace(\ing()); 17. }

首先对输入的内容进行处理,行程InputSource类型的对象,传入parse方法,该方法调用第三方库Neko(还有另外一个TagSoup)将输入的数据形成DOM树,并将头节点返回给root。接着,调用HTMLMetaProcessor的getMetaTags获取网页中meta标签的属性。meta标签中包含了许多对搜索引擎爬虫有用的信息,指明了哪些网页可以检索,哪些链接不能跟踪等,在这个方法中用到的一些标签对应的变量如下:

noIndex 允许搜索引擎访问这个URL,但不允许搜索引擎索引它,且不允许在搜索结果页面显示

noFollow 允许搜索引擎访问这个URL,允许搜索引擎索引它,在搜索结果页面显示,但不传递PR值 noCache 是否缓存页面 refresh 是否重定向 baseHref

refreshTime 重定向跳转的时间 refreshHref 重定向到的链接

接下来就根据meta标签中规定的内容进行进一步的解析: Java代码

1. if (!metaTags.getNoIndex()) { // okay to index 2. StringBuffer sb = new StringBuffer();

3. if (LOG.isTraceEnabled()) { LOG.trace(\ }

4. utils.getText(sb, root); // extract text 提取网页内容

5. text = sb.toString(); 6. sb.setLength(0);

7. if (LOG.isTraceEnabled()) { LOG.trace(\; }

8. utils.getTitle(sb, root); // extract title 提取标题

9. title = sb.toString().trim(); 10.}

如果该网页允许索引,则从DOM树的title标签下取出网页的标题,并提取去掉标签后的网页内容。 Java代码

1. if (!metaTags.getNoFollow()) { // okay to follow links ArrayList l = new ArrayList(); // extract outlinks

2. URL baseTag = utils.getBase(root);

3. if (LOG.isTraceEnabled()) { LOG.trace(\; }

4. utils.getOutlinks(baseTag!=null?baseTag:base, l, root); outlinks = l.toArray(new Outlink[l.size()]); 5. if (LOG.isTraceEnabled()) {

6. LOG.trace(\ent.getUrl()); 7. } 8. }

如果这个页面可以跟踪出链,则从DOM中提取出所有的出链并赋给outlinks数组。 Java代码

1. if (metaTags.getRefresh()) { // 要重定向

2. status.setMinorCode(ParseStatus.SUCCESS_REDIRECT); 3. status.setArgs(new String[] {metaTags.getRefreshHref().toString(),

4. Integer.toString(metaTags.getRefreshTime())}); 5. }

如果在meta标签中发现需要重定向,则设置解析状态的minorCode为SUCCESS_REDIRECT,并设置好重定向需要的重定向链接和刷新时间参数。 Java代码

1. ParseData parseData = new ParseData(status, title, outlinks, 2. content.getMetadata(), metadata); ParseResult parseResult = ParseResult.createParseResult(content.getUrl(),

3. new ParseImpl(text, parseData));

最后,将解析结果构造成ParseResult类型,再进行html过滤之后,就返回构造的ParseResult。

这里需要说明,解析结果是层层封装的,最高层是ParseResult,它包含了所有的解析结果。几个包含解析结果的类的关系如下:

如上图所示,每个类中都包含了不同的解析出的内容,ParseResult类中的parseMap映射里包含的就是键值对,其中parseImpl是对parseData和parseText的封装。creatParseResult方法就是将键值对中需要的内容写入到parseMap中。

在封装好解析结果后,程序返回到ParseSegment类的map方法中: Java代码

1. for (Entry entry : parseResult) {

2. Text url = entry.getKey();

3. Parse parse = entry.getValue(); 4. ......

5. byte[] signature =

6. SignatureFactory.getSignature(getConf()).calculate(content, parse);

7. parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,

8. StringUtil.toHexString(signature)); 9.

10. try {

11. scfilters.passScoreAfterParsing(url, content, parse);

12. } catch (ScoringFilterException e) { 13. ...... 14. } 15. }

16. output.collect(url, new ParseImpl(new ParseText(parse.getText()), parse.getData(), parse.isCanonical())); 17. }

使用迭代器依次从parseResult的parseMap中取出(对于HtmlParser来说,parseMap中只有一组键值对),对content计算MD5签名并存入到contentMeta中。随后又调用scroing filter的

passScoreAfterParsing方法,对于OPIC算法的filter来说,该方法就是将content的metadata中SCORE.KEY对应的得分拷贝到parseData中。 最后,收集map的结果

reduce方法

reduce方法非常简单,就是收集第一个对,并输出: Java代码

1. public void reduce(Text key, Iterator values,

2. OutputCollector output, Reporter reporter)

3. throws IOException {

4. output.collect(key, (Writable)values.next()); // collect first value 5. }

ParseOutputFormat

ParseOutputFormat

虽然看上去只是一个输出格式,但实际上它不仅担负了控制输出的功能还担负了解析页面的作用。该类最主要的方法就是getRecordWriter方法,它为输出返回一个writer。下面对其进行分析: Java代码

1. public RecordWriter getRecordWriter(FileSystem fs,

2. JobConf job, String name, Progressable progress) throws IOException { 3. ......

4. Path out = FileOutputFormat.getOutputPath(job);

5. Path text = new Path(new Path(out, ParseText.DIR_NAME), name);

6. Path data = new Path(new Path(out, ParseData.DIR_NAME), name);

7. Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);

8. final MapFile.Writer textOut = new MapFile.Writer(job, fs, text

9. .toString(), Text.class, ParseText.class, 10. CompressionType.RECORD, progress);

11. final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data

12. .toString(), Text.class, ParseData.class, compType, progress);

13. final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job,

14. crawl, Text.class, CrawlDatum.class, compType, progress);

该方法初始化了三个输出路径,分别用于三种不同格式的输出,这三个输出将用于三种不同的目的。这三个输出分别对应的输出格式如下: 输出 格式 textOut dataOut crawlOut

该类返回一个RecordWriter类的对象,真正向输出写数据的就是这个类的write方法。接下来对write方法进行分析: Java代码

1. public void write(Text key, Parse parse) throws IOException {

2. String fromUrl = key.toString(); 3. String fromHost = null; 4. String toHost = null;

5. textOut.append(key, new ParseText(parse.getText(), parse.getThemeText())); 6.

7. ParseData parseData = parse.getData();

8. String sig = parseData.getContentMeta() .get(Nutch.SIGNATURE_KEY); 9. if (sig != null) {

10. byte[] signature = StringUtil.fromHexString(sig); 11. if (signature != null) {

12. // append a CrawlDatum with a signature 13. CrawlDatum d = new CrawlDatum(

14. CrawlDatum.STATUS_SIGNATURE, 0); 15. d.setSignature(signature);

16. crawlOut.append(key, d); } 17. }

write方法的输入就是reduce方法收集的对,相同的url被分发到同一个reducer上处理。fromUrl就是输入进来的key。这个fromUrl是相对后面解析出来的出链而言的。

首先向textOut中添加输出。接着再从contentMeta中取出网页的签名,如果不为空则将它写成字节流,并添加到key对应的crawlDatum中,将对添加到crawlOut输出。 Java代码

1. ParseStatus pstatus = parseData.getStatus(); 2. if (pstatus != null && pstatus.isSuccess()

3. && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { // 重定向 参数在HtmlParser中设置

4. String newUrl = pstatus.getMessage(); int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); newUrl = normalizers.normalize(newUrl, 5. URLNormalizers.SCOPE_FETCHER); 6. newUrl = filters.filter(newUrl); 7. String url = key.toString();

8. if (newUrl != null && !newUrl.equals(url)) {

9. String reprUrl = URLUtil.chooseRepr(url, newUrl,

10. refreshTime < Fetcher.PERM_REFRESH_TIME); // 根据方法中的算法在原始url和重定向到的url中选出一个代表url(representation url)

11. CrawlDatum newDatum = new CrawlDatum();

12. newDatum.setStatus(CrawlDatum.STATUS_LINKED);

13. if (reprUrl != null && !reprUrl.equals(newUrl)) {

14. newDatum.getMetaData().put( 15. Nutch.WRITABLE_REPR_URL_KEY,

16. new Text(reprUrl)); 17. }

18. crawlOut.append(new Text(newUrl), newDatum); // 添加新解析出的 19. } 20.}

接下来,程序获取parseData中的parseStatus,如果状态中表明需要重定向(上文介绍过在哪里设置需要重定向的标志)则继续获取重定向到的链接newUrl和刷新时间。如果newUrl和key中的url不同,则调用Util.chooseRepr方法根据该方法的算法(参

见 http://help.yahoo.com/l/nz/yahooxtra/search/webcrawler/slurp-11.html)算则出key和newUrl中的一个代表链接。如果代表连接不是newUrl,则将newUrl的crawlDatum的metaData中WRITABLE_REPR_URL_KEY对应的值写成代表连接reprUrl。并将写入到crawlOut的输出中。 接下来的部分收集出链为后面的crawlDb的更新作准备: Java代码

1. Outlink[] links = parseData.getOutlinks();

2. int outlinksToStore = Math.min(maxOutlinks, links.length); 3. ......

4. List> targets = new ArrayList>(outlinksToStore);

5. List outlinkList = new ArrayList( outlinksToStore);

首先取出在前面解析出来的outlink,放入links数组中。 Java代码

1. for (int i = 0; i < links.length

2. && validCount < outlinksToStore; i++) {

3. String toUrl = links[i].getToUrl();

4. // ignore links to self (or anchors within the page) 5. if (fromUrl.equals(toUrl)) { 6. continue; 7. }

8. if (ignoreExternalLinks) { // 忽略链向其它主机的链接 9. try {

10. toHost = new URL(toUrl).getHost().toLowerCase(); 11. } catch (MalformedURLException e) { 12. toHost = null; 13. }

14. if (toHost == null || !toHost.equals(fromHost)) { // external

15. // links

16. continue; // skip it 17. } 18. } 19. try {

20. toUrl = normalizers.normalize(toUrl,

21. URLNormalizers.SCOPE_OUTLINK); // normalize the url

22. toUrl = filters.filter(toUrl); // filter the url 23. if (toUrl == null) { 24. continue; 25. }

26. } catch (Exception e) { 27. continue; 28. }

29. CrawlDatum target = new CrawlDatum(

30.CrawlDatum.STATUS_LINKED, interval); // interval 爬行间隔 31. Text targetUrl = new Text(toUrl); 32. try {

33. scfilters.initialScore(targetUrl, target); 34. } catch (ScoringFilterException e) {

35. LOG.warn(\

36. + \;

37. target.setScore(0.0f); 38. } 39.

40. targets.add(new SimpleEntry(targetUrl, target)); // 将出链的加入到targets List中

41. outlinkList.add(links[i]); // 将出链加入到outlinkList的List中 42. validCount++; 43.}

接下来进入到循环中,直到所有的出链遍历完或者达到存储限制。每次循环从outlinks中取出一个出链,如果限制了链向其它主机的链接,则忽略这样的链接。对toUrl进行正规化和过滤,如果不能通过这些检查则跳到下一次循环。对与能够通过检查的链接,则对其赋予初始得分后将链接及其对应的

crawlDatum以的形式加入到targets列表中。将url加入到linkList中。

接着,调用scoring filter的distributeScoreToOutlinks为出链分发得分,在OPIC算法中,分发的方法是父链接的得分/出链的个数。 Java代码

1. for (Entry target : targets) {

2. crawlOut.append(target.getKey(), target.getValue()); 3. }

将带得分的所有通过规范性和过滤检查的出链写入到crawlOut中。 Java代码

1. Outlink[] filteredLinks = outlinkList

2. .toArray(new Outlink[outlinkList.size()]);

3. parseData = new ParseData(parseData.getStatus(), parseData 4. .getTitle(), filteredLinks, parseData.getContentMeta(),

5. parseData.getParseMeta());

6. dataOut.append(key, parseData); // 向dataOut中添加

将通过过滤的出链存入数组filteredLinks中,并将parseData重新生成。向dataOut中写入这些 Java代码

1. if (!parse.isCanonical()) {

2. CrawlDatum datum = new CrawlDatum();

3. datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS); 4. String timeString = parse.getData().getContentMeta().get(

5. Nutch.FETCH_TIME_KEY); 6. try {

7. datum.setFetchTime(Long.parseLong(timeString)); 8. } catch (Exception e) {

9. LOG.warn(\10. datum.setFetchTime(System.currentTimeMillis()); 11. }

12. crawlOut.append(key, datum); 13.}

最后,如果是子链接,向crawlOut中添加。write方法结束。

小节

由于ParseOutputFormat设计三个输出路径,每个输出路径又不止一次输出,所以在这有必要对所有的输出进行一下小节: textOut:输出的内容:根据reduce输出的中的parse得到ParseText,以此形成输出。

dataOut:write方法最后的带出链得分的。 crawlOut:(1)reduce输出的key 和带有它对应的网页签名的。(2)如果key的网页要重定向,则输出重定向的。(3)通过正规化和过滤的带得分的出链的

Nutch 1.0源代码分析(5): Index

文章分类:互联网

分析了索引部分之后一直忘了放到博客上来,今天补上。

索引这部分的操作相对比较简单,应用Lucene提供的接口实现索引的功能。在进行了网页数据库和链接数据库的更新之后,crawl程序跳出循环,进行爬行阶段的最后一项工作——索引。索引这一过程在crawl的入口是: Java代码

1. indexer.index(indexes, crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats)));

该方法位于org.apache.nutch.indexer中,功能是配置并启动索引阶段的MapReduce任务。该方法调用initMRJob方法对MapReduce任务进行配置,配置的内容为:输入路径(每轮爬行中保存的爬行和解析目录,链接数据库目录),Mapper和Reducer所在的类(IndexerMapReduce),输出格式

(IndexerOutputFormat),输出键值对的类型()。 在initMRJob中还对写入索引的域进行了配置,之后启动MapReduce任务。 首先分析Map方法: Java代码

1. public void map(Text key, Writable value,

2. OutputCollector output, Reporter reporter) throws IOException {

3. output.collect(key, new NutchWritable(value)); 4. }

Map的内容很简单,就是将输入原样输出,但是以url作为键分发到reducer上,这样,几种不同输入目录中同一个url对应的不同类型的数据就分发到同一个reducer上了。 接下来逐步分析reduce方法: Java代码

1. public void reduce(Text key, Iterator values, 2. OutputCollector output, Reporter reporter) 3. throws IOException {

4. while (values.hasNext()) { // 判别类型,values 中有对应于同一url的几种数据

5. final Writable value = values.next().get(); // unwrap 6. if (value instanceof Inlinks) { 7. inlinks = (Inlinks)value;

8. } else if (value instanceof CrawlDatum) { 9. ...... 10.} //end of while

reduce部分首先将收集的值判断所属类型,由于输入时values中有各种类型的值,因此要分门别类的存储,以便后面的使用,随后,将存储的值填入索引中:

Java代码

1. // add segment, used to map from merged index back to segment files

2. doc.add(\

3. // add digest, used by dedup

4. doc.add(\5. ......

6. doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);

7. boost = this.scfilters.indexerScore(key, doc, dbDatum, 8. fetchDatum, parse, inlinks, boost); 9. doc.add(\10.output.collect(key, doc);

调用index的filter向索引中加入其它关心的域,默认采用BasicIndexing作为filter,该类的filter方法将一些基本的field,如title等加入到doc中。调用scroing filter的indexerScore方法计算索引阶段的得分,并将此得分写入boost值,并写入索引。 最后,收集键值对。 索引阶段结束。

本文来源:https://www.bwwdw.com/article/ncqr.html

Top