0%

HBase实现搜索引擎

静态搜索引擎

上一节已经配置了一个hadoop+spark集群,xuranus@172.17.11.40 为本机,在本机NAT下六个节点:

集群说明

xuranusMaster@172.16.173.136
xuranusSlave1@172.16.173.137
xuranusSlave2@172.16.173.138
zookeeper001@172.16.173.139
zookeeper002@172.16.173.140
zookeeper003@172.16.173.141

Hbase部署在172.16.173.136~139上。

建表

Hbase Shell下创建新表:create ‘school_tb’,’attr’

https://github.com/s-top/Baike-KnowledgeGraph提供的百科上爬来的部分大学名录构建表:列族atter包含chinese_name,english_name,famous_alumni,found_time,location,major_department这些属性,学校名字作为rowkey

插入数据的java代码:

public static void buildSchoolData() {  
String path = "/home/xuranus/Desktop/hadoop/homework/all.json";
Gson gson = new Gson();
JsonParser jsonParser = new JsonParser();
String jsonStr = "";
String lineStr;
try {
BufferedReader in = new BufferedReader(new FileReader(new File(path)));
while((lineStr = in.readLine())!=null) {
jsonStr = jsonStr.concat(lineStr);
}
} catch (Exception e) {
e.printStackTrace();
}
JsonObject jsonObject = jsonParser.parse(jsonStr).getAsJsonObject();

JsonArray schools = jsonObject.get("data").getAsJsonArray();
for(JsonElement school:schools) {
String chinese_name = school.getAsJsonObject().get("中文名").getAsString();
String location = school.getAsJsonObject().get("所属地区").getAsString();
String abbreviation = school.getAsJsonObject().get("简称").getAsString();
String major_department = school.getAsJsonObject().get("主要院系").getAsString();
String found_time = school.getAsJsonObject().get("创办时间").getAsString();
String english_name = school.getAsJsonObject().get("英文名").getAsString();
String famous_alumni = school.getAsJsonObject().get("知名校友").getAsString();

try {
put("school_tb", chinese_name, "attr", "chinese_name", chinese_name);
put("school_tb", chinese_name, "attr", "location", location);
put("school_tb", chinese_name, "attr", "abbreviation", abbreviation);
put("school_tb", chinese_name, "attr", "major_department", major_department);
put("school_tb", chinese_name, "attr", "found_time", found_time);
put("school_tb", chinese_name, "attr", "english_name", english_name);
put("school_tb", chinese_name, "attr", "famous_alumni", famous_alumni);
} catch (Exception e) {
e.printStackTrace();
}
}
}

建立索引

划分中文单词,统计词频建立索引:

public class Test3 {  
public static Configuration conf;
public static void put(String tablename,String row, String columnFamily,String column,String data) throws Exception {
HTable table = new HTable(Test3.conf, tablename);
Put p1=new Put(Bytes.toBytes(row));
p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
table.put(p1);
System.out.println("put '"+row+"','"+columnFamily+":"+column+"','"+data+"'");
}

public static class TokenizerMapper extends TableMapper<Text, Text> {
private Text word = new Text();
private Text valueInfo = new Text(); //存储词频

public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
List<Cell> cs = value.listCells();
for (Cell cell : cs) {
byte[] bt = cell.getValue();
InputStream ip = new ByteArrayInputStream(bt);
Reader read = new InputStreamReader(ip);
IKSegmenter iks = new IKSegmenter(read, true);
Lexeme t;

while ((t = iks.next()) != null) {
String rowkey = new String(key.get());
word.set(t.getLexemeText().concat(":").concat(rowkey));
valueInfo.set("1");
context.write(word, valueInfo);
}
}
}
}

public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text info = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString() );
}
int splitIndex = key.toString().indexOf(":");
info.set( key.toString().substring( splitIndex + 1) +":"+sum );
key.set( key.toString().substring(0,splitIndex));
context.write(key, info);
}
}


public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
//生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString()+";";
}
result.set(fileList);
context.write(key, result);
try{
System.out.println("-------InvertedIndexReducer-------");
System.out.println("key="+key.toString());
System.out.println("value="+fileList);
System.out.println("----------------------------------");
put("keyword_map",key.toString(),"cf","data",fileList);
} catch (Exception e) {
e.printStackTrace();
}

}

}

public static void start() throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
System.setProperty("hadoop.home.dir","/home/xuranus/Desktop/hadoop/hadoop-2.7.6");
conf = new Configuration();
String tablename="school_tb";
String columnFamily="attr";
conf.set("hbase.zookeeper.quorum","zookeeper001,zookeeper002,zookeeper003");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.master", "172.16.173.136:16000");

conf.addResource("/home/xuranus/hdfs-site.xml");

Job job = Job.getInstance(conf, "word count");
job.setJarByClass(Test3.class);

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("chinese"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("location"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("abbreviation"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("major_department"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("found_time"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("english_name"));
scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("famous_alumni"));


TableMapReduceUtil.initTableMapperJob(tablename, scan, TokenizerMapper.class,Text.class, Text.class, job);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileOutputFormat.setOutputPath(job, new Path("hdfs://172.16.173.136:9000/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Ouput目录输出如下:

1902年	东南大学:1;北京师范大学:1;南京大学:1;
1903年 中南大学:1;湖南大学:1;
1905年 中国农业大学:1;复旦大学:1;
1907年 同济大学:1;
....

界面和查询

查询做成了一个java web程序,后端springboot,接收输入关键词,在索引表中查找,返回词条索引字符串,拆分字符串做处理,在school_tb中查询每个键,把结果存成数组以json形式发到前端。

react做前端接受结果做渲染,用正则表达式高亮关键字,效果如下:

自此,一个简单的wordcount搜索引擎就实现了。但是,这只能给静态的资源提供搜索,进一步的,我们可以用spark实现动态增加词条资源。

动态搜索引擎

集群说明

一共有6个节点

xuranusMaster:172.16.173.136
xuranusSlave1:172.16.173.137
xuranusSlave2:172.16.173.138

zookeeper001:172.16.173.139
zookeeper002:172.16.173.140
zookeeper003:172.16.173.141

其中服务的分布:

Hadoop:xuranusMaster xuranusSlave1 xuranusSlave2
Hbase:xuranusMaster xuranusSlave1 xuranusSlave2
Zookeeper:zookeeper001 zookeeper002 zookeeper003
spark:xuranusMaster xuranusSlave1 xuranusSlave2
kafka:xuranusMaster xuranusSlave1 xuranusSlave2

算法说明

本次实验用kafka+spark-streaming搭建了一个实时流式计算集群。

先从wikipedia上下了10多篇关于编程语言的文章存储在本地,用scala写了一个kafka生产者,读取文件,并向消息队列发送文本(每隔2s发送一次,模拟爬虫)。

生产者:KafkaProducerThread.scala

class KafkaProducerThread extends Thread{  
override def run():Unit = {
val topic = "test"
val brokers = "xuranusMaster:9092,xuranusSlave1:9092,xuranusSlave2:9092"
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("group.id", "xuranus")
props.put("client.id", "ScalaProducerExample")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

println("Start sent simulate.")
for(i <- 1 to 14) {
val fileName = "/home/xuranus/spark_homework/article/" + i + ".txt" //读取文件 写入消息队列
val source = Source.fromFile(fileName)
val str = source.mkString
val record = new ProducerRecord[String, String](topic,fileName , str)
producer.send(record)
println("produced " + i + " articles!")
Thread.sleep(2000) // 2s一次
}
producer.close()
}
}

streaming作为kafka消费者从消息队列拿key(存储目录)和value(文本内容)。在每个RDD接受到的数据中,先用正则匹配去除符号,清洗数据,然后分割统计词频建立所有存储到hbase

消费者 KafkaStreamingConsumer.scala文件

class KafkaStreamingConsumer extends Thread{  
Logger.getLogger("org").setLevel(Level.OFF)//关闭无关输出
Logger.getLogger("akka").setLevel(Level.OFF)

@transient
def insert(word:String,index:String,count:String): Unit ={
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "zookeeper001,zookeeper002,zookeeper003")
conf.set("hbase.master", "xuranusMaster:16000")
val connection = ConnectionFactory.createConnection(conf)
val admin = connection.getAdmin()

val table = connection.getTable(TableName.valueOf("articles"))
val theput= new Put(Bytes.toBytes(word))
theput.addColumn(Bytes.toBytes("articlesCF"), Bytes.toBytes(index), Bytes.toBytes(count))
table.put(theput)
}

override def run()= {
System.setProperty("hadoop.home.dir", "/home/xuranus/Desktop/hadoop/hadoop-2.7.6")
System.setProperty("log4j.rootCategory", "ERROR")
val conf = new SparkConf().setAppName("app").setMaster("local[*]")
val ssc = new StreamingContext(conf,Seconds(2))
val kafkaParam = Map(
"metadata.broker.list" -> "xuranusMaster:9092,xurausSlave01:9092,xuranusSlave2:9092",
"bootstrap.servers" -> "xuranusMaster:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest",
"group.id" -> "tweet-consumer"
)

val preferredHosts = LocationStrategies.PreferConsistent

val topics = Array("test")
// 获取日志数据
val stream =
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParam)
)

stream.map(record=>(record.key().toString,record.value().toString))
.foreachRDD(rdd=>{
rdd.foreachPartition(partition=>{
partition.foreach(s=>{
val path = s._1
val content = s._2
val wordCounts = content //清洗数据
.replaceAll("[0123456789]", " ")
.replaceAll("\\[", " ")
.replaceAll("\\]", " ")
.replaceAll("[,.:;]", " ")
.replaceAll("\"", " ")
.replaceAll("'", " ")
.replaceAll("\n", " ")
.replaceAll("\\(", " ")
.replaceAll("\\)", " ")
.toLowerCase()
.split(" "). //词频统计
map((_,1)).groupBy(_._1).map(x=>(x._1,x._2.length))

wordCounts.filter(pair=>pair._1.length>0).foreach((pair)=>{
val word = pair._1
val count = pair._2+""
val index = path
println((word,index,count)) //打印准备创建的索引

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "zookeeper001,zookeeper002,zookeeper003")
conf.set("hbase.master", "xuranusMaster:16000")
val connection = ConnectionFactory.createConnection(conf)

//持久化关键字索引
val table1 = connection.getTable(TableName.valueOf("articles"))
val put1= new Put(Bytes.toBytes(word))
put1.addColumn(Bytes.toBytes("articlesCF"), Bytes.toBytes(index), Bytes.toBytes(count))
table1.put(put1)

//持久化文本资源
val table2 = connection.getTable(TableName.valueOf("articles_asset"))
val put2= new Put(Bytes.toBytes(index))
put2.addColumn(Bytes.toBytes("articlesCF"), Bytes.toBytes("content"), Bytes.toBytes(content))
table2.put(put2)

connection.close()
})

})
})
})
ssc.start()
ssc.awaitTermination()
}
}

Ui用web呈现,后端渲染,框架采用sbt play framework,主要的路由:实现了index,search,detail三个页面对应主页,搜索页,详细页。

package controllers  
import javax.inject._
import play.api.mvc._

@Singleton
class HomeController @Inject()(cc: ControllerComponents) extends AbstractController(cc) {

def index = Action {
Ok(views.html.index("Welcome"))
}

def search(keyword:String) = Action {
val res = HbaseQuery.searchKeyWord(keyword).
map(pair=>(pair._1,limitStr(HbaseQuery.getArticle(pair._1)),pair._2))
Ok(views.html.search(res))
}

def detail(index:String) = Action {
val detail = HbaseQuery.getArticle(index)
Ok(views.html.detail(detail))
}

def limitStr(s:String): String = s match {
case _s if _s.length > 300 => s.substring(0,300)+"..."
case _ => s
}
}

程序结构

运行说明

先启动web服务器,浏览器打开,进入搜索界面:

开启消费者,等待接受消息,然后开启生产者开始写入。(此处两个进程为了便于测试用线程模拟)。console输出如下:(词条,索引,索引对应的词条频数)被持久化到hbase

输入词条,开始查询,每隔几秒刷新一次,发现有新的搜索记录产生(录屏中有体现)

点击一条记录,可以查看详细文本:Ctrl+F可以看到匹配的关键词

查看hbase的信息,发现索引已生成:

Disqus评论区没有正常加载,请使用科学上网