Nutch 1.0源代码分析
更新时间:2024-03-30 00:39:01 阅读量: 综合文库 文档下载
- nutcher推荐度:
- 相关推荐
Nutch 1.0源代码分析(1): Crawl初始化与Injector
文章分类:互联网
从今天开始陆续对Nutch 1.0的工作过程进行分析。从Crawl为起点,先分析爬虫从爬行到索引的过程,再分析查询检索的过程。如有错误,欢迎批评指正!
Crawl类是Nutch爬虫中的一个核心类,它的主要方法就是该类中的main方法,该方法包含了爬虫的整个运行阶段,包括Inject(将初始URL加入到网页库CrawlDb中),Generate(产生待爬行队列),Fetch(抓取网页)和Index(索引)。这里通过分析main方法的执行过程分析nutch爬虫从建立网页库到建立索引的全过程。Crawl类位于org.apache.nutch.crawl包中,接下来就对main方法进行分析。 1.初始化阶段
根据nutch-default.xml,nutch-site.xml,nutch-tool.xml 3个配置文件和用户在命令行输入的参数,配置和初始化爬行深度、线程数、爬行根目录、topN、crawlDb、linkDb、segments、indexes等目录,并初始化injector、generator、fetcher、parser、indexer等。 2.Inject
Inject阶段将初始URL进行合法性检查(UrlNomalizer)如:统一大小写,检查是否为合法URL等和过滤(UrlFilter),如过滤掉用户指定的不爬行的链接之后注入到网页数据库crawlDb中。调用inject(crawlDb, rootUrlDir)方法,该方法在org.apache.nutch.crawl.Injector类中,下面对其进行分析。 inject方法的两个参数,Path crawlDb代表crawlDb的路径,这里是crawl/crawlDb;Path rootUrlDir 代表初始URL的路径。
Java代码
1. // org.apache.nutch.crawl.Injector
2. public void inject(Path crawlDb, Path urlDir) throws IOException {
3. // crawlDb 网页数据库目
录 // urlDir 种子URL目录 4. ... ...
5. Path tempDir =
6. new Path(getConf().get(\ \eger.MAX_VALUE))); 7.
8. // map text input file to a
10. LOG.info(\entries.\
11. }
12. JobConf sortJob = new NutchJob(getConf()); 13. sortJob.setJobName(\
14. FileInputFormat.addInputPath(sortJob, urlDir); 15. sortJob.setMapperClass(InjectMapper.class); 16.
17. FileOutputFormat.setOutputPath(sortJob, tempDir);
18. sortJob.setOutputFormat(SequenceFileOutputFormat.class); 19. sortJob.setOutputKeyClass(Text.class);
20. sortJob.setOutputValueClass(CrawlDatum.class);
21. sortJob.setLong(\illis());
22. JobClient.runJob(sortJob); 23. ... ...
程序的前半部分将输入文件map成
那么在Map阶段具体都做了哪些事呢?需要进一步分析InjectMapper类。该类的configure方法对urlNormalizer、interval、scfiliter、scoreInjected等域进行初始化,为后面的map做准备。其中interval是网页抓取的时间间隔,默认情况下是30天,scfilter是当前nutch使用的ScoringFilter,scoreInjected是inject到crawlDb中的链接得分,默认是1.0。接下来是最重要的map方法: Java代码
1. public void map(WritableComparable key, Text value,
2. OutputCollector
3. throws IOException {
4. String url = value.toString(); // value is line of text 5. try {
6. url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
7. url = filters.filter(url); // filter the url
8. } catch (Exception e) {
9. if (LOG.isWarnEnabled()) { LOG.warn(\\
10. url = null; 11. }
12. if (url != null) { // if it passes
13. value.set(url); // collect it
14. CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
15. datum.setFetchTime(curTime); 16. datum.setScore(scoreInjected); 17. try {
18. scfilters.injectedScore(value, datum); 19. } catch (ScoringFilterException e) { 20. if (LOG.isWarnEnabled()) {
21. LOG.warn(%url +
22. \);
23. }
24. datum.setScore(scoreInjected); 25. }
26. output.collect(value, datum); // 一个URL对应一个状态信息,key是URL,值是状态信息 27. } 28. }
这个map方法的主要功能是将用户输入的初始URL从文本转换为
ScoringFilter中指定了在inject阶段怎样计算注入链接的得分则使用这个得分,否则使用默认的1.0)、抓取时间。最后,收集url和与它对应的爬行状态。 程序返回到inject方法中继续执行: Java代码
1. // merge with existing crawl db 2. if (LOG.isInfoEnabled()) {
3. LOG.info(\); 4. }
5. JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
6. FileInputFormat.addInputPath(mergeJob, tempDir); 7. mergeJob.setReducerClass(InjectReducer.class); 8. JobClient.runJob(mergeJob);
9. CrawlDb.install(mergeJob, crawlDb); 10.
11. // clean up
12. FileSystem fs = FileSystem.get(getConf()); 13. fs.delete(tempDir, true);
14. if (LOG.isInfoEnabled()) { LOG.info(\15.
16. } // inject方法结束
这部分首先调用CrawlDb的createJob静态方法对接下来的将injected URL合并到crawlDb的MapReduce任务进行配置。createJob将FileInputPath设置为crawl/crawlDb/current,Mapper类设置为CrawlDbFilter,Reducer类设置为CrawlDbReducer,输出路径(outputPath)设置为crawl/crawlDb/下的一个以随机数命名的目录,输出的键值对类型是
在该方法后,inject方法继续对接下来的MapReduce任务进行配置,增加一个输入路径tempDir(即上次Map任务的输出。这样,Map任务实际有两个输入路径),设置Reducer类为InjectReducer(此处待确认:覆盖creatJob中设置的Reducer?????)。
Map阶段CrawlDbFilter的作用是将crawl/crawldb/current和上一步Map处理(读入初始URL并map)后的
Reduce阶段InjectReducer的任务是将已存在于current中的网页数据库和新加入在outputPath中的数据库进行合并。如果新加入的网页已存在于current中,则不修改其状态,否则将新加入链接datum的状态从
STATUS_INJECTED改为STATUS_DB_UNFETCHED。这样就完成了新旧数据库的合并 最后,通过CrawlDb.install(mergeJob, crawlDb)方法将tempDir删除。将新的数据库命名为current。
至此,Inject阶段的任务完成,该阶段数据处理的流程图如下:
Nutch 1.0源代码分析(2): Generate
文章分类:互联网 在Inject过后,程序返回到crawl的main方法中继续执行,接下来进入到一个循环,循环的终止条件是达到指定的爬行深度。在循环中依次进行generate和fetch两个操作,每次循环产生一个segment,位于crawl/segments下,以generate方法调用的时间作为这次循环产生目录的名称。首先分析Generate操作。
Generator类位于org.apache.nutch.crawl中,其中的核心方法是public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime)方法,作用是根据链接得分,产生包含topN个链接的待爬行队列fethclist,并更新网页数据库。crawl正是调用了这个方法进入Generate阶段。其中各个参数的含义是:
Path dbDir crawldb的路径 Path segments segments的路径 int numLists Reduce任务的数量
long topN 用户指定的TopN数量,即每轮爬行选择前TopN个URL加入到fetchlist中
long curTime 调用generate方法的时间
上述方法代码如下: Java代码
1. public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime) throws IOException { 2.
3. JobConf job = new NutchJob(getConf());
4. boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
5. return generate(dbDir, segments, numLists, topN, curTime, filter, false); 6. }
它通过配置文件读取是否要进行URL过滤,默认为过滤。然后调用
generate(dbDir, segments, numLists, topN, curTime, filter, false)方法,这个方法是真正进行generate的地方,它产生了一个位于系统临时目录下的目录,命名为tempDir。 Java代码
1. public Path generate(Path dbDir, Path segments,
2. int numLists, long topN, long curTime, boolean filter,
3. boolean force) 4. throws IOException { 5. ... ...
6. JobConf job = new NutchJob(getConf());
7. job.setJobName(\8.
9. if (numLists == -1) { // for politeness make
10. numLists = job.getNumMapTasks(); // a partition per fetch task
11. // 获取Reduce任务的数量,默认是1 12. }
13. if (\ts != 1) { // 如果是local模式,则将reduce的数量置1 14. // override
15. LOG.info(\actly one partition.\16. numLists = 1; 17. }
18. ... ...
19. FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // crawl/crawldb/current
20. job.setInputFormat(SequenceFileInputFormat.class); 21.
22. job.setMapperClass(Selector.class);
23. job.setPartitionerClass(Selector.class); 24. job.setReducerClass(Selector.class); 25.
26. FileOutputFormat.setOutputPath(job, tempDir);
27. job.setOutputFormat(SequenceFileOutputFormat.class); 28. job.setOutputKeyClass(FloatWritable.class);
29. job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
30. job.setOutputValueClass(SelectorEntry.class); 31. try {
32. JobClient.runJob(job); 33. } catch (IOException e) {
34. LockUtil.removeLockFile(fs, lock); 35. throw e; 36.}
程序首先对几个路径进行配置,然后获取Reducer的数量,如果hadoop当
前的模式为local,则将Reducer的数量numLists设为1。 3.1. 第1次MapReduce
程序接下来将对第1个MapReduce任务进行配置。输入路径InputPath为当前的网页数据库(crawl/crawldb/current),输出路径为临时目录tempDir。Mapper、Partitioner、Reducer类都是Selector类。 首先分析Map任务: Java代码
1. public void map(Text key, CrawlDatum value, OutputCollector
2. throws IOException { 3. Text url = key; 4. if (filter) {
5. // If filtering is on don't generate URLs that don't pass URLFilters 6. try {
7. if (filters.filter(url.toString()) == null) // 过滤URL
8. return;
9. } catch (URLFilterException e) { 10. if (LOG.isWarnEnabled()) {
11. LOG.warn(\etMessage()
12. + \13. } 14. } 15. }
16. CrawlDatum crawlDatum = value; 17.
18. // check fetch schedule
19. if (!schedule.shouldFetch(url, crawlDatum, curTime)) { // 默认情况下调用DefaultFetchSchedule继承的shouldFetch方法, 20. // 根据crawlDatum的时间和当前时间相比,如果比当前时间更新,则
21. // 可以考虑加入fetch list
22. // \当前时间\可以进行配置,不一定就是实际中的当前时间
23. LOG.debug(\e=\24. return; 25. } 26.
27. LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
28. if (oldGenTime != null) { // awaiting fetch & update 29. if (oldGenTime.get() + genDelay > curTime) // still wait for update
30. return; 31. }
32. float sort = 1.0f; 33. try {
34. sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort); // 根据当前使用的ScoringFilter计算该URL在Generate阶段的得分
35. } catch (ScoringFilterException sfe) { 36. if (LOG.isWarnEnabled()) {
37. LOG.warn(\key + \38. } 39. }
40. // sort by decreasing score, using DecreasingFloatComparator
41. sortValue.set(sort);
42. // record generation time
43. crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
44. entry.datum = crawlDatum; 45. entry.url = (Text)key;
46. output.collect(sortValue, entry); // invert for sort by score 47. }
map首先对输入的URL进行过滤,如果被过滤掉则返回,否则继续执行。接下来,调用shouldFetch方法通过对比当前时间和数据库中该链接的
fetchtime属性的间隔来判断是否考虑抓取该链接,如果不到应该的时间间隔则不怕,但是如果到了,也不一定就抓取,还要看后面的处理。接下来通过调用ScoringFilter的generatorSortValue方法判断在Generate阶段的得分。并将这个得分作为键收集将datum和url封装进SelectorEntry类型的entry对象中,并将entry作为值收集。第1次Map后的结果就是
接下来分析第一次MapReduce任务的Partitioner: Java代码
1. public int getPartition(FloatWritable key, Writable value, 2. int numReduceTasks) {
3. return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
4. numReduceTasks); 5. }
它根据url代表的主机名进行分区,将同一个主机上的URL交给同一个Reducer处理,这样体现了对该站点的礼貌性(politeness)。第一次MapReduce的Reducer如下: Java代码
1. public void reduce(FloatWritable key, Iterator
2. OutputCollector
3. Reporter reporter) 4. throws IOException { 5.
6. while (values.hasNext() && count < limit) { 7. SelectorEntry entry = values.next(); 8. Text url = entry.url;
9. String urlString = url.toString(); 10. URL u = null; 11.
12. // skip bad urls, including empty and null urls 13. try {
14. u = new URL(url.toString());
15. } catch (MalformedURLException e) {
16. LOG.info(\
17. continue; 18. } 19.
20. String host = u.getHost(); 21. host = host.toLowerCase(); 22. String hostname = host; 23. ... ... 24. try {
25. urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
26. host = new URL(urlString).getHost(); 27. } catch (Exception e) {
28. LOG.warn(\g (\
29. StringUtils.stringifyException(e) + \30. continue; 31. } 32.
33. // only filter if we are counting hosts 34.// 对同一个host,产生url的最大个数,通过配置文件获得,如果为-1,则没有限// 制
35. if (maxPerHost > 0) { 36.
37. IntWritable hostCount = hostCounts.get(host); 38. if (hostCount == null) {
39. hostCount = new IntWritable(); 40. hostCounts.put(host, hostCount); 41. }
42. // increment hostCount
43. hostCount.set(hostCount.get() + 1); 44. // skip URL if above the limit per host. 45. if (hostCount.get() > maxPerHost) {
46. if (hostCount.get() == maxPerHost + 1) {
47. // remember the raw hostname that is maxed out 48. maxedHosts.add(hostname); 49. if (LOG.isInfoEnabled()) {
50. LOG.info(\axPerHost + \51. } 52. }
53. continue; 54. } 55. }
56. output.collect(key, entry);
57. // Count is incremented only when we keep the URL 58. // maxPerHost may cause us to skip it. 59. count++; 60. } 61. }
limit是每个Reducer最大需要处理的链接的数量,由limit =
job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks()得到,也就是说,是由topN/Reducer的数量决定的。只要每个Reducer没有达到这个最大限度,就从输入的value中取出链接。如果用户配置了
generate.max.per.host属性并设置为正值,则会限制同一个host中产生的链
接数。程序会过滤掉超过指定数目的属于同一主机的URL。reduce方法最终收集通过过滤,且符合数量要求的
由于每次收集的键值对的数量是受limit限制的,而且reduce输入的value又是根据链接的得分值从高到低排序的,所以当达到limit的限制时,低分的url就被略掉了。
第1次MapReduce任务的输出还是以<得分,链接信息>的形式保存的。 3.2. 第2次MapReduce
在进行第1次MapReduce后,程序返回generate方法中,并进行第2次MapReduce: Java代码
1. ... ...
2. job = new NutchJob(getConf());
3. job.setJobName(\4.
5. job.setInt(\nt()); 6.
7. FileInputFormat.addInputPath(job, tempDir);
8. job.setInputFormat(SequenceFileInputFormat.class); 9.
10. job.setMapperClass(SelectorInverseMapper.class); 11. job.setMapOutputKeyClass(Text.class);
12. job.setMapOutputValueClass(SelectorEntry.class); 13. job.setPartitionerClass(PartitionUrlByHost.class); 14. job.setReducerClass(PartitionReducer.class); 15. job.setNumReduceTasks(numLists); 16.
17.// output是crawl/segments/yyyyMMddHHmmss/crawl_generate 18. FileOutputFormat.setOutputPath(job, output); job.setOutputFormat(SequenceFileOutputFormat.class); 19. job.setOutputKeyClass(Text.class);
20. job.setOutputValueClass(CrawlDatum.class);
21. job.setOutputKeyComparatorClass(HashComparator.class);
此次MapReduce的输入是第一次MapReduce输出到的临时文件,输出路径是segment/目录下的crawl_generate。
下面来分析Mapper的任务,Mapper类是SelectorInverseMapper: Java代码
1. public static class SelectorInverseMapper extends MapReduceBase implements Mapper
3. public void map(FloatWritable key, SelectorEntry value, OutputCollector
4. SelectorEntry entry = (SelectorEntry)value; 5. output.collect(entry.url, entry); 6. } 7. }
map方法的作用是将第一次MapReduce输出的以<链接得分,链接信息>为形式的文件转变为
1. public static class PartitionReducer extends MapReduceBase 2. implements Reducer
4. public void reduce(Text key, Iterator
5. OutputCollector
6. while (values.hasNext()) {
7. SelectorEntry entry = values.next(); 8. output.collect(entry.url, entry.datum); 9. } 10. } 11.}
Reducer的作用也十分简单,就是将Mapper的输出进一步转化,形成crawlDb中数据的保存形式 selectorEntry>到 在第二次MapReduce执行后,紧接着进行第三次MapReduce任务,这次的目的是更新tempDir中的generate时间信息,并输出到tempDir2中: Java代码 1. Path tempDir2 = 2. new Path(getConf().get(\3. \; 4. 5. job = new NutchJob(getConf()); 6. job.setJobName(\ 7. job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); 8. FileInputFormat.addInputPath(job, tempDir); 9. FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); // 两个InputPath 10. job.setInputFormat(SequenceFileInputFormat.class); 11. job.setMapperClass(CrawlDbUpdater.class); 12. job.setReducerClass(CrawlDbUpdater.class); 13. job.setOutputFormat(MapFileOutputFormat.class); 14. job.setOutputKeyClass(Text.class); 15. job.setOutputValueClass(CrawlDatum.class); 16. FileOutputFormat.setOutputPath(job, tempDir2); 此次MapReduce以第一次MapReduce的输出tmpDir(里面存放着 1. public void map(WritableComparable key, Writable value, OutputCollector 2. if (key instanceof FloatWritable) { // tempDir source key是得分,value是该得分对应的所有selector entry 3. SelectorEntry se = (SelectorEntry)value; 4. output.collect(se.url, se.datum); 5. } else { 6. output.collect((Text)key, (CrawlDatum)value); // key是URL,value是datum 7. } 8. } 它将两个输入路径中的不同数据形式统一为 1. public void reduce(Text key, Iterator 2. while (values.hasNext()) { 3. CrawlDatum val = values.next(); 4. if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { 5. LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); 6. genTime.set(gt.get()); 7. if (genTime.get() != generateTime) { 8. orig.set(val); 9. genTime.set(0L); 10. continue; 11. } 12. } else { 13. orig.set(val); 14. } 15. } 16. if (genTime.get() != 0L) { 17. orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); 18. } 19. output.collect(key, orig); 20. } Reduce任务的功能是将输入的 在第三次MapReduce执行后,generate方法将tempDir2命名为当前的网页数据库,删除旧的crawlDb,返回segment对象。Generate阶段执行完毕。 这一阶段的数据流程图如下: Nutch 1.0源代码分析(3): Fetch -- 1 文章分类:互联网 今天分析Fetch的过程。 Generate阶段产生了待爬行队列,并保存在segment/crawl_generate目录下。在Generate结束后,进入Fethch阶段,该阶段同Generate一样处于循环中,只要未达到指定的爬行深度,就继续。这一阶段是实际抓取网页的过程,它采用1个生产者(QueueFeeder)-多个消费者(FetchThread)的模型。QueueFeeder从输入中读取fetchlist并产生FetchItemQueue队列,该队列中包含了描述将要爬行的项目的FetchItem。这个队列是以要爬行的主机划分的,每个主机对应一个队列。但是在同一时刻,爬行FetchItem的数量受并发线程数的限制。 FetchThread从队列中不断地取出FetchItem,QueueFeeder向其中添加,这样就保持了一个动态平衡,直到FetchItemQueue中的FetchItem都被消耗完了。当队列中FetchItem的数量达到0,爬行结束。 爬行过程中,爬虫会考虑到对待爬行主机的礼貌(politeness),每个主机的爬行礼貌设置可以不同,比如最大并发请求和爬行间隔等。每个 FetchItemQueue对象还维护着一个名为inProgress的队列,用于保存正在抓取的FetchItem,当FetchItemQueue返回一个FetchItem后,便从这个队列中移出并放入inProgress队列中。FetchItemQueue还保存FetchItem最后向主机请求的时间。 当FetchThread从FetchItemQueue中请求新的将要抓取的项,队列返回一个项给FetchItem,或者如果由于礼貌性的原因项没有准备好,则返回null。如果队列中还有待爬行项,且所有的项都没有准备好,那么FetchThread就等待,直到有项目准备好或到达timeout时间。 Fetch阶段以org.apache.nutch.crawl.Fetcher类中的fetch(Path segment, int threads, boolean parsing)方法作为入口,该方法的参数如下: Path segment segment的路径,这里是segment/crawl_generate int threads 用户指定的线程数 Boolean 是否进行解析 fetch方法中主要是对一个MapReduce任务进行配置,并运行这个任务。这个任务中只有Map而没有Reduce。输入路径是segment/crawl_generate,这就是在Generate阶段产生的待爬行队列。输入格式是InputFormat,它Fetcher中实现,继承了SequenceFileInputFormat但是特殊指出在于不将文件进行分片,而是整个文件作为一个分片送给Mapper。输出格式是FetcherOutputFormat,它是定义在org.apache.nutch.fetcher中,有三种不同的输出,分别根据value属于:(1)CrawlDatum,(2)Content还是(3)Parse的实例输出到不同的目录中。在最后,runJob(job)调用Fetcher类中的run方法启动任务: Java代码 1. // Fetcher中的run方法 2. public void run(RecordReader 3. OutputCollector 4. throws IOException { 5. this.output = output; 6. this.reporter = reporter; 7. this.fetchQueues = new FetchItemQueues(getConf()); 8. int threadCount = getConf().getInt(\ch\10); 9. ... ... 10. feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); 11. // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); 12. feeder.start(); // 调用QueueFeeder的run方法 4.1 QueueFeeder 方法的前半部分初始化了一个QueueFeeder,QueueFeeder维护着一个FetchItemQueues类型的名为queues的队列,这个队列是主机名和它对应 FetchItemQueue的映射,所有相同主机名的抓取项(FetchItem)放入相同主机名对应的FetchItemQueue中。 接着,feeder调用start方法启动QueueFeeder的线程: Java代码 1. // QueueFeeder中的run方法 2. public void run() { 3. boolean hasMore = true; // reader中是否还有数据 4. int cnt = 0; 5. while (hasMore) { 6. int feed = size - queues.getTotalSize(); // 队列还有多少剩余空间 7. if (feed <= 0) { 8. // queues are full - spin-wait until they have some free 9. // space 10. // 队列满,等待 11. try { 12. Thread.sleep(1000); 13. } catch (Exception e) { 14. } 15. ; 16. continue; 17. } else { // 队列有剩余空间,向其中添加FetchItem 18. LOG.debug(\ ...\ 19. while (feed > 0 && hasMore) { 20. try { 21. Text url = new Text(); 22. CrawlDatum datum = new CrawlDatum(); 23. hasMore = reader.next(url, datum); // next方法,如果成功读取了下一个 24. if (hasMore) { 25. queues.addFetchItem(url, datum); 26. cnt++; 27. feed--; 28. } 29. } catch (IOException e) { 30. ... ... 31. return; 32. } 33. ...... 34. } 该线程的工作比较简单,就是检查队列是否未满,如果未满且输入中还有数据,则将数据添加到FetchItemQueue中。添加到队列的过程分两部进行,先添加到FetchItemQueues,再添加到FetchItemQueue中。 这里在说明一下根据主机进行映射的概念。如果传入fetch方法的parsing参数是true,则以ip + protocol作为queueId存入队列映射,否则以hostname + protocol作为queueId作为queueId存入映射。上面代码中的getTotalSize方法返回totalSize,这个变量是当前FetchItemQueue中队列的FetchItem的个数。 FetchItemQueues和FetchItemQueue中包含的内容和对应关系如下: QueueFeeder的run对应的流程图如下: Nutch 1.0源代码分析(3): Fetch -- 2 文章分类:互联网 4.2 FetcherThread 在启动了QueueFeeder的线程后,Fetcher类run方法的主线程继续执行: Java代码 1. for (int i = 0; i < threadCount; i++) { // spawn threads 2. new FetcherThread(getConf()).start(); 3. } 4. ...... 它根据用户指定的爬行线程数,开启了threadCount个爬行线程抓取网页。其后,当当前活动线程数>0时,就报告线程的状态。直到当前线程数为0时,run方法的主线程也退出。活跃线程数activeThreads是当前活跃的FetchThread的线程的个数。 接下来就分析用于抓取网页的类FetcherThread,该类同样是Fetcher类的一个内部类,继承了Thread,继而可以使用多线程。Fetcher类的run方法中启动了FetchThread的线程,调用其中的run方法,下面对其进行分析。 Java代码 1. public void run() { 2. activeThreads.incrementAndGet(); // count threads 3. FetchItem fit = null; 4. try { 5. while (true) { 6. fit = fetchQueues.getFetchItem(); // 使用Iterator从queues中取出第一个FetchItemQueue,再从queue中取出第一个FetchItem 7. if (fit == null) { 8. if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { // queue没有准备好返回fetchitem,等待 9. LOG.debug(getName() + \g ...\ 10. // spin-wait. 11. spinWaiting.incrementAndGet(); 12. try { 13. Thread.sleep(500); // 阻塞500ms后重新调度 14. } catch (Exception e) { 15. } 16. spinWaiting.decrementAndGet(); 17. continue; 18. } else { // 该线程的所有工作都已完成,退出 19. // all done, finish this thread 20. return; 21. } 22. } 进入run方法后,首先为当前活跃线程数activeThreads加1,接下来进入一个死循环,进行网页的抓取。它首先调用getFetchItem方法,如果当前inProgress队列的大小>=每台主机允许的并发爬行线程数,则返回null,否则从所有FetchItemQueue中取出一个已经准备好(已经超过爬行间隔时间)的FetchItem,并加入到inProgress队列中。如果该值为空,则有两种可能,一种是出于礼貌性,队列不能返回FetchItem,则线程阻塞500毫秒后继续请求。另外一种可能是当前线程要的所有工作都已完成,则线程退出。 注意,同一主机对应的FetchItemQueue拥有相同的爬行间隔时间时间(得到爬行间隔时间的方法见后面的分析),在该queue中的一个FetchItem被爬行后,会设置这个队列的下一次请求时间,只有超过这个时间的队列才能返回新的FetchItem。这样便实现了对同一个主机的礼貌性访问控制。 接下来又进入一个循环,循环的终止条件是达到最大重定向次数或者不允许重定向。 Java代码 1. do { 2. redirecting = false; 3. Protocol protocol = this.protocolFactory .getProtocol(fit.url.toString()); 4. RobotRules rules = protocol.getRobotRules(fit.url, 5. fit.datum); // 获取爬行规则 6. // rules实际上是RobotRuleSet类的实例,该类实现了RobotRules接口 7. if (!rules.isAllowed(fit.u)) { // unblock 8. fetchQueues.finishFetchItem(fit, true); 9. if (LOG.isDebugEnabled()) { 10. LOG.debug(\ 11. } 12. output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, 13. CrawlDatum.STATUS_FETCH_GONE); 14. continue; 15. } 16. if (rules.getCrawlDelay() > 0) { 17. if (rules.getCrawlDelay() > maxCrawlDelay) {// unblock 18. fetchQueues.finishFetchItem(fit, true); 19. output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); 20. continue; 21. } else { 22. FetchItemQueue fiq = fetchQueues .getFetchItemQueue(fit.queueID); 23. fiq.crawlDelay = rules.getCrawlDelay(); 24. } 25. } 首先根据要爬行的url确定其协议类型并初始化一个对应的Protocol对象,以下以http协议举例,则protocol初始化为org.apache.nutch.protocol包中的http类,其基类org.apache.nutch.protocol.http.api包中的HttpBase类。接着,生成一个RobotRule类型的对象,得到该url对应的站点的robot.txt文件,该文件中规定了爬虫的爬行限制。 如果不允许爬行,则调用finishFetchItem方法从inProgress集合中删除该fit。并设置该url对应datum的status为STATUS_FETCH_GONE,设置下一次爬行时间nextFetchTime。收集 1. ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum); 2. ProtocolStatus status = output.getStatus(); 3. Content content = output.getContent(); 4. ParseStatus pstatus = null; 5. fetchQueues.finishFetchItem(fit); protocol对象调用HttpBase类中的getProtocolOutput方法,该方法再调用Http类中的getResponse方法使用socket获取所请求文件的内容。返回一个ProtocolOutput类的对象。这个类中包含了对应协议请求的输出,包括 Content类的内容和ProtocolStatus类的协议状态。然后调用finishFetchItem方法将fit从inProgress队列中移出,解除对该主机队列的阻塞。 接着,通过协议状态码来进行下一步的工作。协议状态码是前面进行http请求获得响应后根据响应的内容填写的,包括WOULDBLOCK,SUCCESS等状态,每种状态处理的方式不同,以下介绍请求后成功获得一个网页的response之后的处理过程 Java代码 1. switch (status.getCode()) { 2. case ProtocolStatus.WOULDBLOCK: 3. ...... 4. case ProtocolStatus.SUCCESS: // got a page 5. pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); updateStatus(content.getContent().length); // 增加网页数量和字节数 6. if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == parseStatus.SUCCESS_REDIRECT) { 7. String newUrl = pstatus.getMessage(); 8. int refreshTime = Integer.valueOf(pstatus 9. .getArgs()[1]); 10. Text redirUrl = handleRedirect( fit.url,fit.datum, 11. urlString, newUrl, 12. refreshTime < Fetcher.PERM_REFRESH_TIME,Fetcher.CONTENT_REDIR); 13. if (redirUrl != null) { 14. CrawlDatum newDatum = new CrawlDatum( CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(), 15. fit.datum.getScore()); 16. if (reprUrl != null) { 17. newDatum.getMetaData().put( Nutch.WRITABLE_REPR_URL_KEY, 18. new Text(reprUrl)); } 19. fit = FetchItem.create(redirUrl, 20. newDatum, byIP); 21. if (fit != null) { 22. FetchItemQueue fiq = fetchQueues .getFetchItemQueue(fit.queueID); fiq.addInProgressFetchItem(fit); 23. } else { // stop redirecting 24. redirecting = false; 25. } 26. } 27. } 28. break; 29. case...: 首先,调用output方法,该方法做了以下几个工作:(1)向链接对应的datum写入STATUS_FETCH_SUCCESS状态和当前时间作为抓取时间。向content对应的metadata中写入segment_name_key,score_key,fetch_status_key各自对应的值,其中score_key对应的得分由ScoringFilter算得。(2) 如果配置了parse则进行解析,解析的结果返回为ParseResult类的对象。(3)收集 在output方法返回后,调用updateStatus增加当前爬行的网页数和字节数。如果经过解析,网页中有重定向链接,则将重定向的链接加入到FetchItemQueue中。 在当重定向次数大于规定次数或者不允许重定向时,退出循环。 最后,将当前活跃进程数减1。网页抓取线程结束。 Nutch 1.0源代码分析(3): Fetch -- 3 文章分类:互联网 4.3 小结 最后我们从更高的一个视角来回顾爬行过程中的一些问题。 4.3.1 线程模型 Fetcher采用了经典的生产者消费者模型,生产者是QueueFeeder,用于根据主机/协议对向FetchItemQueues中不断地添加待爬行的FetchItem,只要FetchItemQueues队列中还有空间并且有输入数据,就添加,否则阻塞。消费者自然是FetchThread,它负责抓取网页。爬虫总共的线程数由配置决定,默认值是10,还可以指定对于同一个主机队列最大的爬行线程数,它们之和不超过总共的线程数。每个主机队列的最大线程数决定了每个主机队列中inProgress队列的大小,出于礼貌性一般对于同一个主机,只采用一个线程爬行。当待爬行的全部网页被爬行完后,爬行线程退出。 4.3.2 Mapper的分析 在了结了Fetch阶段的整个流程后,我们不禁要问,在分布式的情况下,不同的task node是怎样实现避免爬行相同网站的呢? 这要从Generate阶段说起,回顾Generate的第2次MapReduce,在Mapper输出的时候是以相同主机名作为分发到Reducer的依据的,因此各个Reducer生成了若干个文件,每个文件对应自己处理的那个主机对应的 Nutch 1.0源代码分析(4): Parse 文章分类:互联网 Parse阶段和Fetch同属于一个循环层次中,在Fetch后由Parse阶段对抓取的内容进行解析。该阶段的入口是: Java代码 1. public void parse(Path segment) throws IOException 方法。其中就是对即将启动的Parse阶段的MapReduce任务进行配置,主要的配置如下: Java代码 1. FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); 2. job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); 3. job.setInputFormat(SequenceFileInputFormat.class); 4. job.setMapperClass(ParseSegment.class); 5. job.setReducerClass(ParseSegment.class); 6. 7. FileOutputFormat.setOutputPath(job, segment); 8. job.setOutputFormat(ParseOutputFormat.class); 9. job.setOutputKeyClass(Text.class); 10. job.setOutputValueClass(ParseImpl.class); 其中输入路径就是Fetch阶段保存下来的segment/content目录,其中键值对的类型是 map方法 在进行配置之后就启动任务,首先分析map方法: Java代码 1. public void map(WritableComparable key, Content content, 2. OutputCollector 3. throws IOException { 4. // convert on the fly from old UTF8 keys 5. if (key instanceof UTF8) { 6. newKey.set(key.toString()); 7. key = newKey; 8. } 9. 10. int status = 11. Integer.parseInt(content.getMetadata().get(Nutch.FETCH_STATUS_KEY)); // 获取fetch阶段的状态 12. if (status != CrawlDatum.STATUS_FETCH_SUCCESS) { 13. // content not fetched successfully, skip document 14. LOG.debug(\ successfully\15. return; 16. } 17. ParseResult parseResult = null; 18. try { 19. parseResult = new ParseUtil(getConf()).parse(content); 20. } catch (Exception e) { 21. LOG.warn(\ingifyException(e)); 22. return; 23. } map首先获取Fetch阶段的状态,如果没有抓取成功,则返回,跳过这个网页,如果成功则调用ParseUtil的parse方法,对页面进行解析,该方法实现如下: Java代码 1. parsers = this.parserFactory.getParsers(content.getContentType(), 2. content.getUrl() != null ? content.getUrl():\ 首先根据网页的类型找出可以解析这个网页的全部parser,再依次调用每个parser的getParse方法对网页进行解析,只要有一个parser成功解析了网页就返回给map方法的parseResult。对于HTML来说,使用HtmlParser作为其解析器。 Java代码 1. public ParseResult getParse(Content content) { // 解析content 2. HTMLMetaTags metaTags = new HTMLMetaTags(); 3. ...... 4. DocumentFragment root; 5. try { 6. byte[] contentInOctets = content.getContent(); 7. InputSource input = new InputSource(new ByteArrayInputStream(contentInOctets)); 8. ...... 9. input.setEncoding(encoding); 10. if (LOG.isTraceEnabled()) { LOG.trace(\11. root = parse(input); 12.} 13....... 14.HTMLMetaProcessor.getMetaTags(metaTags, root, base); // 获取网页的meta标签中的属性 15. if (LOG.isTraceEnabled()) { 16. LOG.trace(\ing()); 17. } 首先对输入的内容进行处理,行程InputSource类型的对象,传入parse方法,该方法调用第三方库Neko(还有另外一个TagSoup)将输入的数据形成DOM树,并将头节点返回给root。接着,调用HTMLMetaProcessor的getMetaTags获取网页中meta标签的属性。meta标签中包含了许多对搜索引擎爬虫有用的信息,指明了哪些网页可以检索,哪些链接不能跟踪等,在这个方法中用到的一些标签对应的变量如下: noIndex 允许搜索引擎访问这个URL,但不允许搜索引擎索引它,且不允许在搜索结果页面显示 noFollow 允许搜索引擎访问这个URL,允许搜索引擎索引它,在搜索结果页面显示,但不传递PR值 noCache 是否缓存页面 refresh 是否重定向 baseHref refreshTime 重定向跳转的时间 refreshHref 重定向到的链接 接下来就根据meta标签中规定的内容进行进一步的解析: Java代码 1. if (!metaTags.getNoIndex()) { // okay to index 2. StringBuffer sb = new StringBuffer(); 3. if (LOG.isTraceEnabled()) { LOG.trace(\ } 4. utils.getText(sb, root); // extract text 提取网页内容 5. text = sb.toString(); 6. sb.setLength(0); 7. if (LOG.isTraceEnabled()) { LOG.trace(\; } 8. utils.getTitle(sb, root); // extract title 提取标题 9. title = sb.toString().trim(); 10.} 如果该网页允许索引,则从DOM树的title标签下取出网页的标题,并提取去掉标签后的网页内容。 Java代码 1. if (!metaTags.getNoFollow()) { // okay to follow links ArrayList 2. URL baseTag = utils.getBase(root); 3. if (LOG.isTraceEnabled()) { LOG.trace(\; } 4. utils.getOutlinks(baseTag!=null?baseTag:base, l, root); outlinks = l.toArray(new Outlink[l.size()]); 5. if (LOG.isTraceEnabled()) { 6. LOG.trace(\ent.getUrl()); 7. } 8. } 如果这个页面可以跟踪出链,则从DOM中提取出所有的出链并赋给outlinks数组。 Java代码 1. if (metaTags.getRefresh()) { // 要重定向 2. status.setMinorCode(ParseStatus.SUCCESS_REDIRECT); 3. status.setArgs(new String[] {metaTags.getRefreshHref().toString(), 4. Integer.toString(metaTags.getRefreshTime())}); 5. } 如果在meta标签中发现需要重定向,则设置解析状态的minorCode为SUCCESS_REDIRECT,并设置好重定向需要的重定向链接和刷新时间参数。 Java代码 1. ParseData parseData = new ParseData(status, title, outlinks, 2. content.getMetadata(), metadata); ParseResult parseResult = ParseResult.createParseResult(content.getUrl(), 3. new ParseImpl(text, parseData)); 最后,将解析结果构造成ParseResult类型,再进行html过滤之后,就返回构造的ParseResult。 这里需要说明,解析结果是层层封装的,最高层是ParseResult,它包含了所有的解析结果。几个包含解析结果的类的关系如下: 如上图所示,每个类中都包含了不同的解析出的内容,ParseResult类中的parseMap映射里包含的就是 在封装好解析结果后,程序返回到ParseSegment类的map方法中: Java代码 1. for (Entry 2. Text url = entry.getKey(); 3. Parse parse = entry.getValue(); 4. ...... 5. byte[] signature = 6. SignatureFactory.getSignature(getConf()).calculate(content, parse); 7. parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 8. StringUtil.toHexString(signature)); 9. 10. try { 11. scfilters.passScoreAfterParsing(url, content, parse); 12. } catch (ScoringFilterException e) { 13. ...... 14. } 15. } 16. output.collect(url, new ParseImpl(new ParseText(parse.getText()), parse.getData(), parse.isCanonical())); 17. } 使用迭代器依次从parseResult的parseMap中取出 passScoreAfterParsing方法,对于OPIC算法的filter来说,该方法就是将content的metadata中SCORE.KEY对应的得分拷贝到parseData中。 最后,收集map的结果 reduce方法 reduce方法非常简单,就是收集第一个 1. public void reduce(Text key, Iterator 2. OutputCollector 3. throws IOException { 4. output.collect(key, (Writable)values.next()); // collect first value 5. } ParseOutputFormat ParseOutputFormat 虽然看上去只是一个输出格式,但实际上它不仅担负了控制输出的功能还担负了解析页面的作用。该类最主要的方法就是getRecordWriter方法,它为输出返回一个writer。下面对其进行分析: Java代码 1. public RecordWriter 2. JobConf job, String name, Progressable progress) throws IOException { 3. ...... 4. Path out = FileOutputFormat.getOutputPath(job); 5. Path text = new Path(new Path(out, ParseText.DIR_NAME), name); 6. Path data = new Path(new Path(out, ParseData.DIR_NAME), name); 7. Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name); 8. final MapFile.Writer textOut = new MapFile.Writer(job, fs, text 9. .toString(), Text.class, ParseText.class, 10. CompressionType.RECORD, progress); 11. final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data 12. .toString(), Text.class, ParseData.class, compType, progress); 13. final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, 14. crawl, Text.class, CrawlDatum.class, compType, progress); 该方法初始化了三个输出路径,分别用于三种不同格式的输出,这三个输出将用于三种不同的目的。这三个输出分别对应的输出格式如下: 输出 格式 textOut 该类返回一个RecordWriter类的对象,真正向输出写数据的就是这个类的write方法。接下来对write方法进行分析: Java代码 1. public void write(Text key, Parse parse) throws IOException { 2. String fromUrl = key.toString(); 3. String fromHost = null; 4. String toHost = null; 5. textOut.append(key, new ParseText(parse.getText(), parse.getThemeText())); 6. 7. ParseData parseData = parse.getData(); 8. String sig = parseData.getContentMeta() .get(Nutch.SIGNATURE_KEY); 9. if (sig != null) { 10. byte[] signature = StringUtil.fromHexString(sig); 11. if (signature != null) { 12. // append a CrawlDatum with a signature 13. CrawlDatum d = new CrawlDatum( 14. CrawlDatum.STATUS_SIGNATURE, 0); 15. d.setSignature(signature); 16. crawlOut.append(key, d); } 17. } write方法的输入就是reduce方法收集的 首先向textOut中添加输出 1. ParseStatus pstatus = parseData.getStatus(); 2. if (pstatus != null && pstatus.isSuccess() 3. && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { // 重定向 参数在HtmlParser中设置 4. String newUrl = pstatus.getMessage(); int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); newUrl = normalizers.normalize(newUrl, 5. URLNormalizers.SCOPE_FETCHER); 6. newUrl = filters.filter(newUrl); 7. String url = key.toString(); 8. if (newUrl != null && !newUrl.equals(url)) { 9. String reprUrl = URLUtil.chooseRepr(url, newUrl, 10. refreshTime < Fetcher.PERM_REFRESH_TIME); // 根据方法中的算法在原始url和重定向到的url中选出一个代表url(representation url) 11. CrawlDatum newDatum = new CrawlDatum(); 12. newDatum.setStatus(CrawlDatum.STATUS_LINKED); 13. if (reprUrl != null && !reprUrl.equals(newUrl)) { 14. newDatum.getMetaData().put( 15. Nutch.WRITABLE_REPR_URL_KEY, 16. new Text(reprUrl)); 17. } 18. crawlOut.append(new Text(newUrl), newDatum); // 添加新解析出的 接下来,程序获取parseData中的parseStatus,如果状态中表明需要重定向(上文介绍过在哪里设置需要重定向的标志)则继续获取重定向到的链接newUrl和刷新时间。如果newUrl和key中的url不同,则调用Util.chooseRepr方法根据该方法的算法(参 见 http://help.yahoo.com/l/nz/yahooxtra/search/webcrawler/slurp-11.html)算则出key和newUrl中的一个代表链接。如果代表连接不是newUrl,则将newUrl的crawlDatum的metaData中WRITABLE_REPR_URL_KEY对应的值写成代表连接reprUrl。并将 1. Outlink[] links = parseData.getOutlinks(); 2. int outlinksToStore = Math.min(maxOutlinks, links.length); 3. ...... 4. List 5. List 首先取出在前面解析出来的outlink,放入links数组中。 Java代码 1. for (int i = 0; i < links.length 2. && validCount < outlinksToStore; i++) { 3. String toUrl = links[i].getToUrl(); 4. // ignore links to self (or anchors within the page) 5. if (fromUrl.equals(toUrl)) { 6. continue; 7. } 8. if (ignoreExternalLinks) { // 忽略链向其它主机的链接 9. try { 10. toHost = new URL(toUrl).getHost().toLowerCase(); 11. } catch (MalformedURLException e) { 12. toHost = null; 13. } 14. if (toHost == null || !toHost.equals(fromHost)) { // external 15. // links 16. continue; // skip it 17. } 18. } 19. try { 20. toUrl = normalizers.normalize(toUrl, 21. URLNormalizers.SCOPE_OUTLINK); // normalize the url 22. toUrl = filters.filter(toUrl); // filter the url 23. if (toUrl == null) { 24. continue; 25. } 26. } catch (Exception e) { 27. continue; 28. } 29. CrawlDatum target = new CrawlDatum( 30.CrawlDatum.STATUS_LINKED, interval); // interval 爬行间隔 31. Text targetUrl = new Text(toUrl); 32. try { 33. scfilters.initialScore(targetUrl, target); 34. } catch (ScoringFilterException e) { 35. LOG.warn(\ 36. + \; 37. target.setScore(0.0f); 38. } 39. 40. targets.add(new SimpleEntry(targetUrl, target)); // 将出链的 41. outlinkList.add(links[i]); // 将出链加入到outlinkList的List中 42. validCount++; 43.} 接下来进入到循环中,直到所有的出链遍历完或者达到存储限制。每次循环从outlinks中取出一个出链,如果限制了链向其它主机的链接,则忽略这样的链接。对toUrl进行正规化和过滤,如果不能通过这些检查则跳到下一次循环。对与能够通过检查的链接,则对其赋予初始得分后将链接及其对应的 crawlDatum以 接着,调用scoring filter的distributeScoreToOutlinks为出链分发得分,在OPIC算法中,分发的方法是父链接的得分/出链的个数。 Java代码 1. for (Entry 2. crawlOut.append(target.getKey(), target.getValue()); 3. } 将带得分的所有通过规范性和过滤检查的出链 1. Outlink[] filteredLinks = outlinkList 2. .toArray(new Outlink[outlinkList.size()]); 3. parseData = new ParseData(parseData.getStatus(), parseData 4. .getTitle(), filteredLinks, parseData.getContentMeta(), 5. parseData.getParseMeta()); 6. dataOut.append(key, parseData); // 向dataOut中添加 将通过过滤的出链存入数组filteredLinks中,并将parseData重新生成。向dataOut中写入这些 1. if (!parse.isCanonical()) { 2. CrawlDatum datum = new CrawlDatum(); 3. datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS); 4. String timeString = parse.getData().getContentMeta().get( 5. Nutch.FETCH_TIME_KEY); 6. try { 7. datum.setFetchTime(Long.parseLong(timeString)); 8. } catch (Exception e) { 9. LOG.warn(\10. datum.setFetchTime(System.currentTimeMillis()); 11. } 12. crawlOut.append(key, datum); 13.} 最后,如果是子链接,向crawlOut中添加 小节 由于ParseOutputFormat设计三个输出路径,每个输出路径又不止一次输出,所以在这有必要对所有的输出进行一下小节: textOut:输出的内容:根据reduce输出的 dataOut:write方法最后的带出链得分的 Nutch 1.0源代码分析(5): Index 文章分类:互联网 分析了索引部分之后一直忘了放到博客上来,今天补上。 索引这部分的操作相对比较简单,应用Lucene提供的接口实现索引的功能。在进行了网页数据库和链接数据库的更新之后,crawl程序跳出循环,进行爬行阶段的最后一项工作——索引。索引这一过程在crawl的入口是: Java代码 1. indexer.index(indexes, crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats))); 该方法位于org.apache.nutch.indexer中,功能是配置并启动索引阶段的MapReduce任务。该方法调用initMRJob方法对MapReduce任务进行配置,配置的内容为:输入路径(每轮爬行中保存的爬行和解析目录,链接数据库目录),Mapper和Reducer所在的类(IndexerMapReduce),输出格式 (IndexerOutputFormat),输出键值对的类型( 1. public void map(Text key, Writable value, 2. OutputCollector 3. output.collect(key, new NutchWritable(value)); 4. } Map的内容很简单,就是将输入原样输出,但是以url作为键分发到reducer上,这样,几种不同输入目录中同一个url对应的不同类型的数据就分发到同一个reducer上了。 接下来逐步分析reduce方法: Java代码 1. public void reduce(Text key, Iterator 4. while (values.hasNext()) { // 判别类型,values 中有对应于同一url的几种数据 5. final Writable value = values.next().get(); // unwrap 6. if (value instanceof Inlinks) { 7. inlinks = (Inlinks)value; 8. } else if (value instanceof CrawlDatum) { 9. ...... 10.} //end of while reduce部分首先将收集的值判断所属类型,由于输入时values中有各种类型的值,因此要分门别类的存储,以便后面的使用,随后,将存储的值填入索引中: Java代码 1. // add segment, used to map from merged index back to segment files 2. doc.add(\ 3. // add digest, used by dedup 4. doc.add(\5. ...... 6. doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks); 7. boost = this.scfilters.indexerScore(key, doc, dbDatum, 8. fetchDatum, parse, inlinks, boost); 9. doc.add(\10.output.collect(key, doc); 调用index的filter向索引中加入其它关心的域,默认采用BasicIndexing作为filter,该类的filter方法将一些基本的field,如title等加入到doc中。调用scroing filter的indexerScore方法计算索引阶段的得分,并将此得分写入boost值,并写入索引。 最后,收集
正在阅读:
Nutch 1.0源代码分析03-30
桩基础方案06-11
献给母亲的话02-14
(9套试卷合集)江苏盐城市2018年小升初数学模拟试卷word文档可编辑09-16
检讨书范文大全03-24
2022-2022年高中地理陕西高二同步测试全真试卷【2】含答案考点及04-14
XX道路改造工程监理规划10-07
当一次推销员作文600字06-19
幼儿园教育目标04-17
重点题及参考答案04-25
- 多层物业服务方案
- (审判实务)习惯法与少数民族地区民间纠纷解决问题(孙 潋)
- 人教版新课标六年级下册语文全册教案
- 词语打卡
- photoshop实习报告
- 钢结构设计原理综合测试2
- 2014年期末练习题
- 高中数学中的逆向思维解题方法探讨
- 名师原创 全国通用2014-2015学年高二寒假作业 政治(一)Word版
- 北航《建筑结构检测鉴定与加固》在线作业三
- XX县卫生监督所工程建设项目可行性研究报告
- 小学四年级观察作文经典评语
- 浅谈110KV变电站电气一次设计-程泉焱(1)
- 安全员考试题库
- 国家电网公司变电运维管理规定(试行)
- 义务教育课程标准稿征求意见提纲
- 教学秘书面试技巧
- 钢结构工程施工组织设计
- 水利工程概论论文
- 09届九年级数学第四次模拟试卷
- 源代码
- 分析
- Nutch
- 1.0
- 2013-2015济宁市中考数学试题《空间与图形》分析
- 北京市大兴区农村四校2014-2015学年高二上学期期中联考物理试卷
- 班主任如何组织班级活动案例
- 大学英语精读第五册课后习题答案
- 广联达GCL2008构件代码应用技巧
- 衡阳城市公共休闲空间布局优化研究
- 车床刀尖半径补偿的运用
- 人教版初三上学期语文复习资料(基础知识) - 图文
- 《C语言程序设计》C函数定义和使用
- 关于评选表彰2011年曲靖师范学院“平安和谐家庭”的通知
- 理想点亮人生2012.11
- 销售价格管理制度
- 农业气象学练习题
- 社会主义核心价值观主题班会活动方案
- 2014年红土地专合社农综项目合作社项目申报书
- 三年级数学教学计划 - 图文
- 扣眼穿刺技术及护理
- 生产企业免抵退申报流程
- 二年级下册的表内除法应用题
- 计算机基础知识选择题考试必备考试真题模拟题