Flume+Hadoop+Hive的离线分析系统基本架构 - 图文

整个离线分析的总体架构就是使用Flume从FTP服务器上采集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Hadoop的mapreduce清洗日志文件,最后使用HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,当然大家也可以尝试一些自动化的任务调度工具,比如说AZKABAN或者OOZIE等。






本系统中我们采用Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式如下: 样例数据格式: - - [18/Sep/2013:06:57:50 +0000] \\InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)\ 格式分析:


2、访客用户信息: - -

3、请求时间:[18/Sep/2013:06:57:50 +0000]







10、访客所用浏览器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170;

InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)




Flume的agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每一个Flume agent部署在一台服务器上,Flume会收集web server 产生的日志数据,并封装成一个个的事件发送给Flume Agent的Source,Flume Agent Source会消费这些收集来的数据事件并放在Flume Agent Channel,Flume Agent Sink会从Channel中收集这些采集过来的数据,要么存储在本地的文件系统中要么作为一个消费资源分发给下一个装在分布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另一个服务器上的Flume Agent Channel里或者正确保存到了本地的文件存储系统中,才会被移除。

本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。

需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选

择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。


1. agent.channels = memorychannel 2. agent.sinks = target 3.

4. agent.sources.origin.type = spooldir

5. agent.sources.origin.spoolDir = /export/data/trivial/weblogs 6. agent.sources.origin.channels = memorychannel

7. agent.sources.origin.deserializer.maxLineLength = 2048 8.

9. agent.sources.origin.interceptors = i2

10. agent.sources.origin.interceptors.i2.type = host

11. agent.sources.origin.interceptors.i2.hostHeader = hostname 12.

13. agent.sinks.loggerSink.type = logger

14. agent.sinks.loggerSink.channel = memorychannel 15.

16. agent.channels.memorychannel.type = memory 17. agent.channels.memorychannel.capacity = 10000 18.

19. agent.sinks.target.type = avro

20. agent.sinks.target.channel = memorychannel 21. agent.sinks.target.hostname = 22. agent.sinks.target.port = 4545

这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。



1. agent.sources = origin

2. agent.channels = memorychannel 3. agent.sinks = target 4.

5. agent.sources.origin.type = avro

6. agent.sources.origin.channels = memorychannel 7. agent.sources.origin.bind = 8. agent.sources.origin.port = 4545 9.

10. #agent.sources.origin.interceptors = i1 i2

11. #agent.sources.origin.interceptors.i1.type = timestamp 12. #agent.sources.origin.interceptors.i2.type = host

13. #agent.sources.origin.interceptors.i2.hostHeader = hostname 14.

15. agent.sinks.loggerSink.type = logger

16. agent.sinks.loggerSink.channel = memorychannel 17.

18. agent.channels.memorychannel.type = memory 19. agent.channels.memorychannel.capacity = 5000000

20. agent.channels.memorychannel.transactionCapacity = 1000000 21.

22. agent.sinks.target.type = hdfs

23. agent.sinks.target.channel = memorychannel

24. agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S 25. agent.sinks.target.hdfs.filePrefix = data-%{hostname} 26. agent.sinks.target.hdfs.rollInterval = 60 27. agent.sinks.target.hdfs.rollSize = 1073741824 28. agent.sinks.target.hdfs.rollCount = 1000000 29. agent.sinks.target.hdfs.round = true 30. agent.sinks.target.hdfs.roundValue = 10 31. agent.sinks.target.hdfs.roundUnit = minute 32. agent.sinks.target.hdfs.useLocalTimeStamp = true 33. agent.sinks.target.hdfs.minBlockReplicas=1 34. agent.sinks.target.hdfs.writeFormat=Text 35. agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。



需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。


这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致


当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗 第一步,先用Mapreduce过滤掉无效的数据

1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date;

6. import java.util.StringTokenizer; 7. import java.util.regex.Matcher; 8. import java.util.regex.Pattern; 9.

10. import org.apache.hadoop.conf.Configuration; 11. import org.apache.hadoop.fs.Path; 12. import org.apache.hadoop.io.IntWritable; 13. import org.apache.hadoop.io.NullWritable; 14. import org.apache.hadoop.io.Text; 15. import org.apache.hadoop.mapreduce.Job; 16. import org.apache.hadoop.mapreduce.Mapper;

17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19.

20. import com.guludada.dataparser.WebLogParser; 21. 22.

23. public class logClean { 24.

25. public static class cleanMap extends Mapper

e> { 26.

27. private NullWritable v = NullWritable.get(); 28. private Text word = new Text();

29. WebLogParser webLogParser = new WebLogParser(); 30.

31. public void map(Object key,Text value,Context context) { 32.

33. //将一行内容转成string

34. String line = value.toString(); 35.

36. String cleanContent = webLogParser.parser(line); 37.

38. if(cleanContent != \ 39. word.set(cleanContent); 40. try {

41. context.write(word,v); 42. } catch (IOException e) {

43. // TODO Auto-generated catch block 44. e.printStackTrace();

45. } catch (InterruptedException e) { 46. // TODO Auto-generated catch block 47. e.printStackTrace(); 48. } 49. } 50. } 51. } 52.

53. public static void main(String[] args) throws Exception { 54.

55. Configuration conf = new Configuration(); 56.

57. conf.set(\ 58.

59. Job job = Job.getInstance(conf); 60.

61. job.setJarByClass(logClean.class); 62.

63. //指定本业务job要使用的mapper/Reducer业务类 64. job.setMapperClass(cleanMap.class); 65.

66. //指定mapper输出数据的kv类型

67. job.setMapOutputKeyClass(Text.class);

68. job.setMapOutputValueClass(NullWritable.class); 69.

70. //指定job的输入原始文件所在目录 71. Date curDate = new Date();

72. SimpleDateFormat sdf = new SimpleDateFormat(\ 73. String dateStr = sdf.format(curDate);

74. FileInputFormat.setInputPaths(job, new Path(\

tr + \ 75.

76. //指定job的输出结果所在目录

77. FileOutputFormat.setOutputPath(job, new Path(\

/\ 78.

79. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去


80. boolean res = job.waitForCompletion(true); 81. System.exit(res?0:1); 82. 83. } 84. 85. }

1. package com.guludada.dataparser; 2.

3. import java.io.IOException;

4. import java.util.regex.Matcher; 5. import java.util.regex.Pattern; 6.

7. import com.guludada.javabean.WebLogBean; 8. /**

9. * 用正则表达式匹配出合法的日志记录 10. * 11. * 12. */

13. public class WebLogParser { 14.

15. public String parser(String weblog_origin) { 16.

17. WebLogBean weblogbean = new WebLogBean(); 18.

19. // 获取IP地址

20. Pattern IPPattern = Pattern.compile(\ 21. Matcher IPMatcher = IPPattern.matcher(weblog_origin); 22. if(IPMatcher.find()) {

23. String IPAddr = IPMatcher.group(0); 24. weblogbean.setIP_addr(IPAddr); 25. } else { 26. return \ 27. }

28. // 获取时间信息

29. Pattern TimePattern = Pattern.compile(\ 30. Matcher TimeMatcher = TimePattern.matcher(weblog_origin); 31. if(TimeMatcher.find()) {

32. String time = TimeMatcher.group(1); 33. String[] cleanTime = time.split(\ 34. weblogbean.setTime(cleanTime[0]); 35. } else { 36. return \ 37. } 38.

39. //获取其余请求信息

40. Pattern InfoPattern = Pattern.compile(

41. \

)\ 42.

43. Matcher InfoMatcher = InfoPattern.matcher(weblog_origin); 44. if(InfoMatcher.find()) { 45.

46. String requestInfo = InfoMatcher.group(1).replace('\\\

47. String[] requestInfoArry = requestInfo.split(\ 48. weblogbean.setMethod(requestInfoArry[0]); 49. weblogbean.setRequest_URL(requestInfoArry[1]); 50. weblogbean.setRequest_protocol(requestInfoArry[2]); 51. String status_code = InfoMatcher.group(2); 52. weblogbean.setRespond_code(status_code); 53.

54. String respond_data = InfoMatcher.group(3); 55. weblogbean.setRespond_data(respond_data); 56.

57. String request_come_from = InfoMatcher.group(4).replace('\\\


58. weblogbean.setRequst_come_from(request_come_from); 59.

60. String browserInfo = InfoMatcher.group(5).replace('\\\ 61. weblogbean.setBrowser(browserInfo); 62. } else { 63. return \ 64. } 65.

66. return weblogbean.toString(); 67. } 68. 69. }

1. package com.guludada.javabean; 2.

3. public class WebLogBean { 4.

5. String IP_addr; 6. String time; 7. String method; 8. String request_URL; 9. String request_protocol; 10. String respond_code; 11. String respond_data; 12. String requst_come_from; 13. String browser;

14. public String getIP_addr() { 15. return IP_addr; 16. }

17. public void setIP_addr(String iP_addr) {

18. IP_addr = iP_addr; 19. }

20. public String getTime() { 21. return time; 22. }

23. public void setTime(String time) { 24. this.time = time; 25. }

26. public String getMethod() { 27. return method; 28. }

29. public void setMethod(String method) { 30. this.method = method; 31. }

32. public String getRequest_URL() { 33. return request_URL; 34. }

35. public void setRequest_URL(String request_URL) { 36. this.request_URL = request_URL; 37. }

38. public String getRequest_protocol() { 39. return request_protocol; 40. }

41. public void setRequest_protocol(String request_protocol) { 42. this.request_protocol = request_protocol; 43. }

44. public String getRespond_code() { 45. return respond_code; 46. }

47. public void setRespond_code(String respond_code) { 48. this.respond_code = respond_code; 49. }

50. public String getRespond_data() { 51. return respond_data; 52. }

53. public void setRespond_data(String respond_data) { 54. this.respond_data = respond_data; 55. }

56. public String getRequst_come_from() { 57. return requst_come_from; 58. }

59. public void setRequst_come_from(String requst_come_from) { 60. this.requst_come_from = requst_come_from; 61. }

62. public String getBrowser() { 63. return browser; 64. }

65. public void setBrowser(String browser) { 66. this.browser = browser; 67. }

68. @Override

69. public String toString() {

70. return IP_addr + \

71. + request_URL + \

72. + \

r; 73. } 74. 75. 76. }




1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.UUID; 13.

14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path; 16. import org.apache.hadoop.io.IntWritable; 17. import org.apache.hadoop.io.NullWritable; 18. import org.apache.hadoop.io.Text; 19. import org.apache.hadoop.mapreduce.Job; 20. import org.apache.hadoop.mapreduce.Mapper; 21. import org.apache.hadoop.mapreduce.Reducer; 22. import org.apache.hadoop.mapreduce.Mapper.Context;

23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 25.

26. import com.guludada.clickstream.logClean.cleanMap; 27. import com.guludada.dataparser.SessionParser; 28. import com.guludada.dataparser.WebLogParser; 29. import com.guludada.javabean.WebLogSessionBean; 30.

31. public class logSession { 32.

33. public static class sessionMapper extends Mapper

{ 34.

35. private Text IPAddr = new Text(); 36. private Text content = new Text();

37. private NullWritable v = NullWritable.get(); 38. WebLogParser webLogParser = new WebLogParser(); 39.

40. public void map(Object key,Text value,Context context) { 41.

42. //将一行内容转成string

43. String line = value.toString(); 44.

45. String[] weblogArry = line.split(\ 46.

47. IPAddr.set(weblogArry[0]); 48. content.set(line); 49. try {

50. context.write(IPAddr,content); 51. } catch (IOException e) {

52. // TODO Auto-generated catch block 53. e.printStackTrace();

54. } catch (InterruptedException e) { 55. // TODO Auto-generated catch block 56. e.printStackTrace(); 57. } 58. } 59. } 60.

61. static class sessionReducer extends Reducer

le>{ 62.

63. private Text IPAddr = new Text(); 64. private Text content = new Text();

65. private NullWritable v = NullWritable.get(); 66. WebLogParser webLogParser = new WebLogParser();

67. SimpleDateFormat sdf = new SimpleDateFormat(\

68. SessionParser sessionParser = new SessionParser(); 69.

70. @Override

71. protected void reduce(Text key, Iterable values, Context conte

xt) throws IOException, InterruptedException { 72.

73. Date sessionStartTime = null;

74. String sessionID = UUID.randomUUID().toString(); 75.


77. //将IP地址所对应的用户的所有浏览记录按时间排序

78. ArrayList sessionBeanGroup = new ArrayList


79. for(Text browseHistory : values) {

80. WebLogSessionBean sessionBean = sessionParser.loadBean(brows


81. sessionBeanGroup.add(sessionBean); 82. }

83. Collections.sort(sessionBeanGroup,new Comparator

an>() { 84.

85. public int compare(WebLogSessionBean sessionBean1, WebLogSes

sionBean sessionBean2) {

86. Date date1 = sessionBean1.getTimeWithDateFormat(); 87. Date date2 = sessionBean2.getTimeWithDateFormat(); 88. if(date1 == null && date2 == null) return 0; 89. return date1.compareTo(date2); 90. } 91. }); 92.

93. for(WebLogSessionBean sessionBean : sessionBeanGroup) { 94.

95. if(sessionStartTime == null) {

96. //当天日志中某用户第一次访问网站的时间

97. sessionStartTime = timeTransform(sessionBean.getTime());

98. content.set(sessionParser.parser(sessionBean, sessionID)


99. try {

100. context.write(content,v); 101. } catch (IOException e) {

102. // TODO Auto-generated catch block 103. e.printStackTrace();

104. } catch (InterruptedException e) { 105. // TODO Auto-generated catch block 106. e.printStackTrace(); 107. } 108. 109. } else { 110.

111. Date sessionEndTime = timeTransform(sessionBean.getTime


112. long sessionStayTime = timeDiffer(sessionStartTime,sess


113. if(sessionStayTime > 30 * 60 * 1000) {

114. //将当前浏览记录的时间设为下一个session的开始时间 115. sessionStartTime = timeTransform(sessionBean.getTim


116. sessionID = UUID.randomUUID().toString();

117. continue; 118. }

119. content.set(sessionParser.parser(sessionBean, sessionID

)); 120. try {

121. context.write(content,v); 122. } catch (IOException e) {

123. // TODO Auto-generated catch block 124. e.printStackTrace();

125. } catch (InterruptedException e) { 126. // TODO Auto-generated catch block 127. e.printStackTrace(); 128. }

129. } 130. } 131. } 132.

133. private Date timeTransform(String time) { 134.

135. Date standard_time = null; 136. try {

137. standard_time = sdf.parse(time); 138. } catch (ParseException e) {

139. // TODO Auto-generated catch block 140. e.printStackTrace(); 141. }

142. return standard_time; 143. } 144.

145. private long timeDiffer(Date start_time,Date end_time) { 146.

147. long diffTime = 0;

148. diffTime = end_time.getTime() - start_time.getTime(); 149.

150. return diffTime;

151. } 152. 153. } 154. 155.

156. public static void main(String[] args) throws Exception { 157.

158. Configuration conf = new Configuration(); 159.

160. conf.set(\ 161.

162. Job job = Job.getInstance(conf); 163.

164. job.setJarByClass(logClean.class); 165.

166. //指定本业务job要使用的mapper/Reducer业务类 167. job.setMapperClass(sessionMapper.class); 168. job.setReducerClass(sessionReducer.class); 169.

170. //指定mapper输出数据的kv类型

171. job.setMapOutputKeyClass(Text.class); 172. job.setMapOutputValueClass(Text.class); 173.

174. //指定最终输出的数据的kv类型

175. job.setOutputKeyClass(Text.class);

176. job.setOutputValueClass(NullWritable.class); 177.

178. Date curDate = new Date();

179. SimpleDateFormat sdf = new SimpleDateFormat(\ 180. String dateStr = sdf.format(curDate); 181.

182. //指定job的输入原始文件所在目录

183. FileInputFormat.setInputPaths(job, new Path(\


184. //指定job的输出结果所在目录

185. FileOutputFormat.setOutputPath(job, new Path(\

ata/\ 186.

187. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn

去运行 188.

189. boolean res = job.waitForCompletion(true); 190. System.exit(res?0:1); 191.

192. } 193. }

1. package com.guludada.dataparser; 2.

3. import java.text.ParseException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date; 6. import java.util.Locale; 7.

8. import com.guludada.javabean.WebLogSessionBean; 9.

10. public class SessionParser { 11.

12. SimpleDateFormat sdf_origin = new SimpleDateFormat(\


13. SimpleDateFormat sdf_final = new SimpleDateFormat(\

; 14.

15. public String parser(WebLogSessionBean sessionBean,String sessionID) { 16.

17. sessionBean.setSession(sessionID); 18. return sessionBean.toString(); 19. } 20.

21. public WebLogSessionBean loadBean(String sessionContent) { 22.

23. WebLogSessionBean weblogSession = new WebLogSessionBean(); 24.

25. String[] contents = sessionContent.split(\ 26. weblogSession.setTime(timeTransform(contents[1])); 27. weblogSession.setIP_addr(contents[0]); 28. weblogSession.setRequest_URL(contents[3]); 29. weblogSession.setReferal(contents[7]); 30.

31. return weblogSession; 32. } 33.

34. private String timeTransform(String time) { 35.

36. Date standard_time = null; 37. try {

38. standard_time = sdf_origin.parse(time); 39. } catch (ParseException e) {

40. // TODO Auto-generated catch block 41. e.printStackTrace(); 42. }

43. return sdf_final.format(standard_time); 44. } 45. }

1. package com.guludada.javabean; 2.

3. import java.text.ParseException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date; 6.

7. public class WebLogSessionBean { 8.

9. String time; 10. String IP_addr; 11. String session; 12. String request_URL; 13. String referal; 14. 15.

16. public String getTime() { 17. return time; 18. }

19. public void setTime(String time) { 20. this.time = time; 21. }

22. public String getIP_addr() { 23. return IP_addr; 24. }

25. public void setIP_addr(String iP_addr) { 26. IP_addr = iP_addr; 27. }

28. public String getSession() { 29. return session; 30. }

31. public void setSession(String session) { 32. this.session = session; 33. }

34. public String getRequest_URL() { 35. return request_URL; 36. }

37. public void setRequest_URL(String request_URL) { 38. this.request_URL = request_URL; 39. }

40. public String getReferal() { 41. return referal; 42. }

43. public void setReferal(String referal) { 44. this.referal = referal; 45. } 46.

47. public Date getTimeWithDateFormat() { 48.

49. SimpleDateFormat sdf_final = new SimpleDateFormat(\


50. if(this.time != null && this.time != \ 51. try {

52. return sdf_final.parse(this.time); 53. } catch (ParseException e) {

54. // TODO Auto-generated catch block 55. e.printStackTrace(); 56. } 57. }

58. return null; 59. } 60.

61. @Override

62. public String toString() {

63. return time + \ 64. + request_URL + \ 65. } 66. 67. 68. 69. }

第二次清理出来的Session信息结构如下: 时间 IP SessionID 请求页面URL Referal URL www.http://www.wodefanwen.com/ 2015-05-30 Session1 /blog/me 19:38:00 2015-05-30 Session1 /blog/me/details www.mysite.com/blog/me 19:39:00 2015-05-30 Session2 /blog/me 19:38:00 www.http://www.wodefanwen.com/


1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.Map; 13.

14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path;

16. import org.apache.hadoop.io.NullWritable; 17. import org.apache.hadoop.io.Text; 18. import org.apache.hadoop.mapreduce.Job; 19. import org.apache.hadoop.mapreduce.Mapper; 20. import org.apache.hadoop.mapreduce.Reducer; 21. import org.apache.hadoop.mapreduce.Mapper.Context;

22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24.

25. import com.guludada.clickstream.logClean.cleanMap; 26. import com.guludada.clickstream.logSession.sessionMapper; 27. import com.guludada.clickstream.logSession.sessionReducer; 28. import com.guludada.dataparser.PageViewsParser; 29. import com.guludada.dataparser.SessionParser; 30. import com.guludada.dataparser.WebLogParser; 31. import com.guludada.javabean.PageViewsBean; 32. import com.guludada.javabean.WebLogSessionBean; 33.

34. public class PageViews { 35.

36. public static class pageMapper extends Mapper { 37.

38. private Text word = new Text();


40. public void map(Object key,Text value,Context context) { 41.

42. String line = value.toString();

43. String[] webLogContents = line.split(\ 44.

45. //根据session来分组

46. word.set(webLogContents[2]); 47. try {

48. context.write(word,value); 49. } catch (IOException e) {

50. // TODO Auto-generated catch block 51. e.printStackTrace();

52. } catch (InterruptedException e) { 53. // TODO Auto-generated catch block 54. e.printStackTrace(); 55. } 56. } 57. } 58.

59. public static class pageReducer extends Reducer

itable>{ 60.

61. private Text session = new Text(); 62. private Text content = new Text();

63. private NullWritable v = NullWritable.get();

64. PageViewsParser pageViewsParser = new PageViewsParser();

65. SimpleDateFormat sdf = new SimpleDateFormat(\

66. //上一条记录的访问信息

67. PageViewsBean lastStayPageBean = null; 68. Date lastVisitTime = null; 69.

70. @Override

71. protected void reduce(Text key, Iterable values, Context conte

xt) throws IOException, InterruptedException { 72.

73. //将session所对应的所有浏览记录按时间排序

74. ArrayList pageViewsBeanGroup = new ArrayList


75. for(Text pageView : values) {

76. PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageV


77. pageViewsBeanGroup.add(pageViewsBean);

78. }

79. Collections.sort(pageViewsBeanGroup,new Comparator

>() { 80.

81. public int compare(PageViewsBean pageViewsBean1, PageViewsBe

an pageViewsBean2) {

82. Date date1 = pageViewsBean1.getTimeWithDateFormat(); 83. Date date2 = pageViewsBean2.getTimeWithDateFormat(); 84. if(date1 == null && date2 == null) return 0; 85. return date1.compareTo(date2); 86. } 87. }); 88.

89. //计算每个页面的停留时间 90. int step = 0;

91. for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { 92.

93. Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); 94.

95. if(lastStayPageBean != null) {

96. //计算前后两次访问记录相差的时间,单位是秒

97. Integer timeDiff = (int) ((curVisitTime.getTime() - last


98. //根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时

99. lastStayPageBean.setStayTime(timeDiff.toString()); 100. } 101.

102. //更新访问记录的步数 103. step++;

104. pageViewsBean.setStep(step+\

105. //更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信


106. lastStayPageBean = pageViewsBean; 107. lastVisitTime = curVisitTime; 108.

109. //输出pageViews信息

110. content.set(pageViewsParser.parser(pageViewsBean));

111. try {

112. context.write(content,v); 113. } catch (IOException e) {

114. // TODO Auto-generated catch block 115. e.printStackTrace();

116. } catch (InterruptedException e) { 117. // TODO Auto-generated catch block 118. e.printStackTrace(); 119. } 120. } 121. } 122. } 123.

124. public static void main(String[] args) throws Exception { 125.

126. Configuration conf = new Configuration(); 127.

128. conf.set(\ 129.

130. Job job = Job.getInstance(conf); 131.

132. job.setJarByClass(PageViews.class); 133.

134. //指定本业务job要使用的mapper/Reducer业务类 135. job.setMapperClass(pageMapper.class); 136. job.setReducerClass(pageReducer.class); 137.

138. //指定mapper输出数据的kv类型

139. job.setMapOutputKeyClass(Text.class); 140. job.setMapOutputValueClass(Text.class); 141.

142. //指定最终输出的数据的kv类型

143. job.setOutputKeyClass(Text.class);

144. job.setOutputValueClass(NullWritable.class); 145.

146. Date curDate = new Date();

147. SimpleDateFormat sdf = new SimpleDateFormat(\ 148. String dateStr = sdf.format(curDate); 149.

150. //指定job的输入原始文件所在目录

151. FileInputFormat.setInputPaths(job, new Path(\


152. //指定job的输出结果所在目录

153. FileOutputFormat.setOutputPath(job, new Path(\

s/\ 154.

155. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn

去运行 156.

157. boolean res = job.waitForCompletion(true); 158. System.exit(res?0:1); 159. 160. } 161. }

1. package com.guludada.dataparser; 2.

3. import com.guludada.javabean.PageViewsBean; 4. import com.guludada.javabean.WebLogSessionBean; 5.

6. public class PageViewsParser { 7. /**

8. * 根据logSession的输出数据加载PageViewsBean 9. * 10. * */

11. public PageViewsBean loadBean(String sessionContent) { 12.

13. PageViewsBean pageViewsBean = new PageViewsBean(); 14.

15. String[] contents = sessionContent.split(\

16. pageViewsBean.setTime(contents[0] + \ 17. pageViewsBean.setIP_addr(contents[2]); 18. pageViewsBean.setSession(contents[3]); 19. pageViewsBean.setVisit_URL(contents[4]); 20. pageViewsBean.setStayTime(\ 21. pageViewsBean.setStep(\ 22.

23. return pageViewsBean; 24. } 25.

26. public String parser(PageViewsBean pageBean) { 27.

28. return pageBean.toString(); 29. } 30. 31. }

1. package com.guludada.javabean;


3. import java.text.ParseException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date; 6.

7. public class PageViewsBean { 8.

9. String session; 10. String IP_addr; 11. String time; 12. String visit_URL; 13. String stayTime; 14. String step;

15. public String getSession() { 16. return session; 17. }

18. public void setSession(String session) { 19. this.session = session; 20. }

21. public String getIP_addr() { 22. return IP_addr; 23. }

24. public void setIP_addr(String iP_addr) { 25. IP_addr = iP_addr; 26. }

27. public String getTime() { 28. return time; 29. }

30. public void setTime(String time) { 31. this.time = time; 32. }

33. public String getVisit_URL() { 34. return visit_URL; 35. }

36. public void setVisit_URL(String visit_URL) { 37. this.visit_URL = visit_URL; 38. }

39. public String getStayTime() { 40. return stayTime; 41. }

42. public void setStayTime(String stayTime) { 43. this.stayTime = stayTime; 44. }

45. public String getStep() {

46. return step; 47. }

48. public void setStep(String step) { 49. this.step = step; 50. } 51.

52. public Date getTimeWithDateFormat() { 53.

54. SimpleDateFormat sdf_final = new SimpleDateFormat(\


55. if(this.time != null && this.time != \ 56. try {

57. return sdf_final.parse(this.time); 58. } catch (ParseException e) {

59. // TODO Auto-generated catch block 60. e.printStackTrace(); 61. } 62. }

63. return null; 64. } 65.

66. @Override

67. public String toString() {

68. return session + \ 69. + visit_URL + \ 70. } 71. 72. }

第三次日志清洗产生的PageViews数据结构如下图: 访问页面 SessionID IP 访问时间 停留第时间 几步 30000 1 30000 2 30000 3 30000 1 Session1 2016-05-30 /blog/me 15:17:30 Session1 2016-05-30 /blog/me/admin 15:18:00 Session1 2016-05-30 /home 15:18:30 Session2 2016-05-30 /products 15:16:30 Session2 2016-05-30 /products/details 30000 2 15:17:00


1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Map; 12.

13. import org.apache.hadoop.conf.Configuration; 14. import org.apache.hadoop.fs.Path;

15. import org.apache.hadoop.io.NullWritable; 16. import org.apache.hadoop.io.Text; 17. import org.apache.hadoop.mapreduce.Job; 18. import org.apache.hadoop.mapreduce.Mapper; 19. import org.apache.hadoop.mapreduce.Reducer;

20. import org.apache.hadoop.mapreduce.Reducer.Context;

21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23.

24. import com.guludada.clickstream.PageViews.pageMapper; 25. import com.guludada.clickstream.PageViews.pageReducer; 26. import com.guludada.clickstream.logClean.cleanMap; 27. import com.guludada.dataparser.PageViewsParser; 28. import com.guludada.dataparser.VisitsInfoParser; 29. import com.guludada.javabean.PageViewsBean; 30.

31. public class VisitsInfo { 32.

33. public static class visitMapper extends Mapper {


35. private Text word = new Text(); 36.

37. public void map(Object key,Text value,Context context) { 38.

39. String line = value.toString();

40. String[] webLogContents = line.split(\ 41.

42. //根据session来分组

43. word.set(webLogContents[2]); 44. try {

45. context.write(word,value); 46. } catch (IOException e) {

47. // TODO Auto-generated catch block 48. e.printStackTrace();

49. } catch (InterruptedException e) { 50. // TODO Auto-generated catch block 51. e.printStackTrace(); 52. } 53. } 54. } 55.

56. public static class visitReducer extends Reducer

ritable>{ 57.

58. private Text content = new Text();

59. private NullWritable v = NullWritable.get();

60. VisitsInfoParser visitsParser = new VisitsInfoParser();

61. SimpleDateFormat sdf = new SimpleDateFormat(\

62. PageViewsParser pageViewsParser = new PageViewsParser();

63. Map viewedPagesMap = new HashMap();


65. String entry_URL = \ 66. String leave_URL = \ 67. int total_visit_pages = 0; 68.

69. @Override

70. protected void reduce(Text key, Iterable values, Context conte

xt) throws IOException, InterruptedException { 71.

72. //将session所对应的所有浏览记录按时间排序

73. ArrayList browseInfoGroup = new ArrayList(); 74. for(Text browseInfo : values) {

75. browseInfoGroup.add(browseInfo.toString()); 76. }

77. Collections.sort(browseInfoGroup,new Comparator() { 78.

79. SimpleDateFormat sdf = new SimpleDateFormat(\


80. public int compare(String browseInfo1, String browseInfo2) {

81. String dateStr1 = browseInfo1.split(\


82. String dateStr2 = browseInfo2.split(\


83. Date date1; 84. Date date2; 85. try {

86. date1 = sdf.parse(dateStr1);

87. date2 = sdf.parse(dateStr2); 88. if(date1 == null && date2 == null) return 0; 89. return date1.compareTo(date2); 90. } catch (ParseException e) {

91. // TODO Auto-generated catch block 92. e.printStackTrace(); 93. return 0; 94. } 95. } 96. }); 97.

98. //统计该session访问的总页面数,第一次进入的页面,跳出的页

99. for(String browseInfo : browseInfoGroup) { 100.

101. String[] browseInfoStrArr = browseInfo.split(\

102. String curVisitURL = browseInfoStrArr[3];

103. Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL


104. if(curVisitURLInteger == null) { 105. viewedPagesMap.put(curVisitURL, 1); 106. } 107. }

108. total_visit_pages = viewedPagesMap.size();

109. String visitsInfo = visitsParser.parser(browseInfoGroup, total_


110. content.set(visitsInfo); 111. try {

112. context.write(content,v); 113. } catch (IOException e) {

114. // TODO Auto-generated catch block

115. e.printStackTrace();

116. } catch (InterruptedException e) { 117. // TODO Auto-generated catch block 118. e.printStackTrace(); 119. } 120. } 121. } 122.

123. public static void main(String[] args) throws Exception { 124.

125. Configuration conf = new Configuration(); 126.

127. conf.set(\ 128.

129. Job job = Job.getInstance(conf); 130.

131. job.setJarByClass(VisitsInfo.class); 132.

133. //指定本业务job要使用的mapper/Reducer业务类 134. job.setMapperClass(visitMapper.class); 135. job.setReducerClass(visitReducer.class); 136.

137. //指定mapper输出数据的kv类型

138. job.setMapOutputKeyClass(Text.class); 139. job.setMapOutputValueClass(Text.class); 140.

141. //指定最终输出的数据的kv类型

142. job.setOutputKeyClass(Text.class);

143. job.setOutputValueClass(NullWritable.class); 144.

145. Date curDate = new Date();

146. SimpleDateFormat sdf = new SimpleDateFormat(\ 147. String dateStr = sdf.format(curDate); 148.

149. //指定job的输入原始文件所在目录

150. FileInputFormat.setInputPaths(job, new Path(\


151. //指定job的输出结果所在目录

152. FileOutputFormat.setOutputPath(job, new Path(\

fo\ 153.

154. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn

去运行 155.

156. boolean res = job.waitForCompletion(true); 157. System.exit(res?0:1); 158. 159. } 160. }

1. package com.guludada.dataparser; 2.

3. import java.util.ArrayList; 4.

5. import com.guludada.javabean.PageViewsBean; 6. import com.guludada.javabean.VisitsInfoBean; 7. import com.guludada.javabean.WebLogSessionBean; 8.

9. public class VisitsInfoParser { 10.

11. public String parser(ArrayList pageViewsGroup,String totalVisitN

um) { 12.

13. VisitsInfoBean visitsBean = new VisitsInfoBean(); 14. String entryPage = pageViewsGroup.get(0).split(\

15. String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split


16. String startTime = pageViewsGroup.get(0).split(\


17. String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(\


18. \


19. String session = pageViewsGroup.get(0).split(\ 20. String IP = pageViewsGroup.get(0).split(\ 21. String referal = pageViewsGroup.get(0).split(\ 22.

23. visitsBean.setSession(session); 24. visitsBean.setStart_time(startTime); 25. visitsBean.setEnd_time(endTime); 26. visitsBean.setEntry_page(entryPage); 27. visitsBean.setLeave_page(leavePage);

28. visitsBean.setVisit_page_num(totalVisitNum); 29. visitsBean.setIP_addr(IP);

30. visitsBean.setReferal(referal); 31.

32. return visitsBean.toString(); 33. } 34. }

1. package com.guludada.javabean; 2.

3. import java.text.ParseException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date; 6.

7. public class VisitsInfoBean { 8.

9. String session; 10. String start_time; 11. String end_time; 12. String entry_page; 13. String leave_page; 14. String visit_page_num; 15. String IP_addr; 16. String referal; 17.

18. public String getSession() { 19. return session; 20. }

21. public void setSession(String session) { 22. this.session = session; 23. }

24. public String getStart_time() { 25. return start_time; 26. }

27. public void setStart_time(String start_time) { 28. this.start_time = start_time; 29. }

30. public String getEnd_time() { 31. return end_time; 32. }

33. public void setEnd_time(String end_time) { 34. this.end_time = end_time;

35. }

36. public String getEntry_page() { 37. return entry_page; 38. }

39. public void setEntry_page(String entry_page) { 40. this.entry_page = entry_page; 41. }

42. public String getLeave_page() { 43. return leave_page; 44. }

45. public void setLeave_page(String leave_page) { 46. this.leave_page = leave_page; 47. }

48. public String getVisit_page_num() { 49. return visit_page_num; 50. }

51. public void setVisit_page_num(String visit_page_num) { 52. this.visit_page_num = visit_page_num; 53. }

54. public String getIP_addr() { 55. return IP_addr; 56. }

57. public void setIP_addr(String iP_addr) { 58. IP_addr = iP_addr; 59. }

60. public String getReferal() { 61. return referal; 62. }

63. public void setReferal(String referal) { 64. this.referal = referal; 65. } 66.

67. @Override

68. public String toString() {

69. return session + \

70. + \

71. + \ 72. } 73. 74. 75. 76. }

第四次清洗日志产生的访问记录表结构如下图: SessionID 访问时间 离开时间 第一次访问最后一次访问的访问的IP 页面 页面 页面总数 /blog/me /home /blog/others /profile 5 10 6 Session1 Session2 Session3 2016-05-30 15:17:00 2016-05-30 14:17:00 2016-05-30 12:17:00 2016-05-30 15:19:00 2016-05-30 15:19:38 2016-05-30 15:40:00 /detail


MapReduce Troubleshooting

指定某个文件夹路径下所有文件作为mapreduce的输入参数的解决方案。 1.hdfs的文件系统中的路径是支持正则表达式的






分析。而在本系统里,我们将使用星型模型来构建数据仓库的ODS(OperationalData Store)层。下面的命令我们可以通过启动Hive的hiveserver2服务器并使用beeline客户端进行操作或者直接写脚本去定时调度。




>> create table pageviews(session string,ip string,requestdate string,requesttime string,visitpage string, staytime string,step string) comment ?this is the table for pageviews? partitioned by(inputDate string) clustered by(session) sorted by(requestdate,requesttime) into 4 buckets row format delimited

fields terminated by ? ?;


>> load data inpath ?/clickstream/pageviews? overwrite into table pageviews partition(inputDate=?2016-05-17?);




>> create table ods_pageviews(session string,ip string,viewtime string,visitpage string, staytime string,step string) partitioned by(inputDate string) clustered by(visitpage) sorted by(viewtime) into 4 buckets row format delimited fields terminated by ? ?;

>> insert into table ods_pageviews partition(inputDate='2016-05-17') select

pv.session,pv.ip,concat(pv.requestdate,\pageviews as pv where pv.inputDate='2016-05-17';


>>create table ods_dim_pageviews_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';

>> insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-05-17') select distinct pv.viewtime,

substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;


>> create table ods_dim_pageviews_url(visitpage string,host string,path string,query string) partitioned by(inputDate string) clustered by(visitpage) sorted by(visitpage) into 4 buckets row format delimited fields terminated by ' ';

>> insert into table ods_dim_pageviews_url partition(inputDate='2016-05-17') select distinct

pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view

parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;


>> select op.visitpage as path,count(*) as num from ods_pageviews as op join

ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' group by op.visitpage sort by num desc limit 20;





>> create table visitsinfo(session string,startdate string,starttime string,enddate string,endtime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session) sorted by(startdate,starttime) into 4 buckets row format delimited fields terminated by ' ';


>> load data inpath '/clickstream/visitsinfo' overwrite into table visitsinfo partition(inputDate='2016-05-18');


>> create table ods_visits(session string,entrytime string,leavetime string,entrypage string,leavepage string,viewpagenum string,ip string,referal string) partitioned by(inputDate string) clustered by(session)

sorted by(entrytime) into 4 buckets row format delimited fields terminated by ' ';

>> insert into table ods_visits partition(inputDate='2016-05-18') select

vi.session,concat(vi.startdate,\,vi.viewpagenum,vi.ip,vi.referal from visitsinfo as vi where vi.inputDate='2016-05-18';


>>create table ods_dim_visits_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) clustered by(year,month,day) sorted by(time) into 4 buckets row format delimited fields terminated by ' ';


>>insert overwrite table ods_dim_visits_time partition(inputDate='2016-05-18') select distinct ov.timeparam,

substring(ov.timeparam,0,4),substring(ov.timeparam,6,2),substring(ov.timeparam,9,2),substring(ov.timeparam,12,2),substring(ov.timeparam,15,2),substring(ov.timeparam,18,2) from (select ov1.entrytime as timeparam from ods_visits as ov1 union select ov2.leavetime as timeparam from ods_visits as ov2) as ov;


>> create table ods_dim_visits_url(pageurl string,host string,path string,query string) partitioned by(inputDate string) clustered by(pageurl) sorted by(pageurl) into 4 buckets row format delimited fields

terminated by ' ';


>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct

ov.pageurl,b.host,b.path,b.query from (select ov1.entrypage as pageurl from ods_visits as ov1 union select ov2.leavepage as pageurl from ods_visits as ov2 ) as ov lateral view

parse_url_tuple(concat('https://localhost',ov.pageurl),'HOST','PATH','QUERY') b as host,path,query;


>>insert into table ods_dim_visits_url partition(inputDate='2016-05-18') select distinct ov.referal,b.host,b.path,b.query from ods_visits as ov lateral view parse_url_tuple(ov.referal,'HOST','PATH','QUERY') b as host,path,query;


>> select ov.leavepage as jumpPage, count(*) as jumpNum from ods_visits as ov group by ov.leavepage order by jumpNum desc;


Hive在创建表的时候无法实现某个字段自增长的关键字,得使用自定义函数(user-defined function)UDF来实现相应的功能。在查询的时候可以使用row_number()来显示行数,不过必须要在complete mode下才能使用,所以可以使用row_number() 函数配合开窗函数over(),具体示例如下。为简单起见,这里我们创建一个临时表,并手动在里面插入要查看的业务页面链接以及该页面的PV总数,通过这几个参数来计算业务页面之间的转换率,也就是所谓的漏斗模型。

假设我们有“/index” -> “/detail” -> “/createOrder” ->”/confirmOrder” 这一业务页面转化流程

首先我们要创建业务页面的PV的临时信息表,临时表和里面的数据会在session结束的时候清理掉 >> create temporary table transactionpageviews(url string,views int) row format delimited fields terminated by ' ';

先统计业务页面的总PV然后按转换步骤顺序插入每个页面的PV信息到transactionpageviews表中 >> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/index' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/detail' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/createOrder' group by opurl.path;

>> insert into table transactionpageviews select opurl.path as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2013' and optime.month='09' and optime.day='19' and opurl.path='/confirmOrder' group by opurl.path;


