微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

nginx+lua+strom+kafka实现热点数据实时展示,实时统计热点数据展示出来

kafka aide_941 32℃
假设最近一段时间内有10万个请求过来,其中1万次请求访问商品1,2万次请求访问商品2,3万次请求访问商品3,其他商品被访问次数均匀。请实时统计热点数据展示出来。
说下主要的解决步骤:
1.分发层nginx将所有的请求hash分发到对应的应用层nginx,防止第一次请求落到了应用层nginx1,访问数据库,加了点缓存,第二次请求落到了nginx2,缓存没有,照样访问数据库了,因此,需要保证productid=1的商品请求只会落到对应的应用层nginx上处理。
2.需要三级缓存架构,应用层nginx本地缓存+ 缓存生产服务(redis缓存+enchache缓存)
3.数据库修改后,需要发送一条数据修改消息给队列,缓存生产服务监听到这个消息,会自主的更新缓存
4.用户请求到达应用层nginx后,即使缓存被命中了,也需要统计出来用户的这次访问请求吧,因此,每请求一次就需要用nginx+lua向kafka发送一条自定义的消息,用于后续请求处理。
5.strom从kafka队列消费消息,并统计出来每个商品的访问次数,并统计出来前5名的商品productid,将productidlist
的信息分段存储在zookeeper中。
6.在做热储备的时候,需要从zookeeper中获取分布式锁,读取节点信息,写入到缓存生产服务中去。
1.首先是请求分发,需要nginx+lua脚本,分发nginx在收到请求的时候,计算出hash值,发送一个http请求到应用层上去处理。
这句话(local uri_args = ngx.req.get_uri_args())的意思是:
拿到url请求地址的参数
local uri_args = ngx.req.get_uri_args()
local productId = uri_args[“productId”]
定义我们需要分发的应用层服务器ip地址,对商品id进行取模,得到选用的服务器的ip地址,字符串拼接使用..
注意:2不是写死的,代表的是备用服务器ip地址的个数,hash%2不是0就是1,那么加1得到的就是1,2,lua脚本中的local host不同于数组下标是从0开始的
local host = {“192.168.31.19”, “192.168.31.187”}
local hash = ngx.crc32_long(productId)
local index = (hash % 2) + 1
local backend = “http://”..host[index]
对于method的处理,requestPath是不带参数也不带host的请求地址,/hello,请求拼接就是requestPath+参数
local requestPath = uri_args[“requestPath”]
requestPath = “/”..requestPath..”?productId=”..productId
因为需要从nginx向应用nginxf发送一个http请求,因此需要引用我们在lua安装包后又手动加入的http包,创建一个http请求
local http = require(“resty.http”)
local httpc = http.new()
创建一个get的http请求,指明机器地址和请求
local resp, err = httpc:request_uri(backend, {
method = “GET”,
path = requestPath
})
如果没有响应,返回请求错误,否则返回请求结果,关闭http连接
if not resp then
ngx.say(“request error :”, err)
return
end
ngx.say(resp.body)
httpc:close()
2.需要三级缓存架构
3.数据库修改后,缓存数据生产服务自动更新
应用层nginx收到请求后,先找本地nginx缓存,再去生产服务查缓存,数据生产服务就是一个redis+encache的jar包,加到项目中可以直接使用缓存。应用层nginx找本地缓存,再找生产服务那块用lua脚本处理。
#得到请求的参数
local uri_args = ngx.req.get_uri_args()
local productId = uri_args[“productId”]
local shopId = uri_args[“shopId”]
#得到nginx的本地共享缓存,my_cache就是我们定义的nginx缓存块的名字
local cache_ngx = ngx.shared.my_cache
#定义在nginx存储的key的名字
local productCacheKey = “product_info_”..productId
local shopCacheKey = “shop_info_”..shopId
#试图首次查找缓存,为空
local productCache = cache_ngx:get(productCacheKey)
local shopCache = cache_ngx:get(shopCacheKey)
#如果没有找到本地缓存,那么创建一个http请求,发送一个http请求,179是对应的缓存生产服务所在的机器的地址,
#path表示实际缓存生产服务的请求路径地址,然后得到返回结果,将结果存储到本地本地缓存(cache_ngx)中,指定时间10分钟
if productCache == “” or productCache == nil then
local http = require(“resty.http”)
local httpc = http.new()
local resp, err = httpc:request_uri(“http://192.168.31.179:8080”,{
method = “GET”,
path = “/getProductInfo?productId=”..productId
})
productCache = resp.body
cache_ngx:set(productCacheKey, productCache, 10 * 60)
end
if shopCache == “” or shopCache == nil then
local http = require(“resty.http”)
local httpc = http.new()
local resp, err = httpc:request_uri(“http://192.168.31.179:8080”,{
method = “GET”,
path = “/getShopInfo?shopId=”..shopId
})
shopCache = resp.body
cache_ngx:set(shopCacheKey, shopCache, 10 * 60)
end
#得到的是json字符串,我们把字符串变成son对象
local cjson = require(“cjson”)
local productCacheJSON = cjson.decode(productCache)
local shopCacheJSON = cjson.decode(shopCache)
#定义个对象,赋值json对象应该显示到页面上的内容
local context = {
productId = productCacheJSON.id,
productName = productCacheJSON.name,
productPrice = productCacheJSON.price,
productPictureList = productCacheJSON.pictureList,
productSpecification = productCacheJSON.specification,
productService = productCacheJSON.service,
productColor = productCacheJSON.color,
productSize = productCacheJSON.size,
shopId = shopCacheJSON.id,
shopName = shopCacheJSON.name,
shopLevel = shopCacheJSON.level,
shopGoodCommentRate = shopCacheJSON.goodCommentRate
}
#创建模板对象,渲染到html页面中,(resty.template不是模板文件位置,而是创建所需的模板jar包的位置)
#product.html是存在的模板文件
local template = require(“resty.template”)
template.render(“product.html”, context)
4.每收到一条请求就用nginx+lua向kafka发送一条自定义的消息,用于后续请求处理
#创建kafka发送者也就是需要lua安装包下的那个kafa-lua交互的jar包
local cjson = require(“cjson”)
local producer = require(“resty.kafka.producer”)
#写死nginx地址列表
local broker_list = {
{ host = “192.168.31.187”, port = 9092 },
{ host = “192.168.31.19”, port = 9092 },
{ host = “192.168.31.227”, port = 9092 }
}
#自定义需要发送给kafka的消息,json字符串
local log_json = {}
log_json[“headers”] = ngx.req.get_headers()
log_json[“uri_args”] = ngx.req.get_uri_args()
log_json[“body”] = ngx.req.read_body()
log_json[“http_version”] = ngx.req.http_version()
log_json[“method”] =ngx.req.get_method()
log_json[“raw_reader”] = ngx.req.raw_header()
log_json[“body_data”] = ngx.req.get_body_data()
local message = cjson.encode(log_json);
local productId = ngx.req.get_uri_args()[“productId”]
#异步发送消息,确保根据productid进行机器对应分区发送
local async_producer = producer:new(broker_list, { producer_type = “async” })
local ok, err = async_producer:send(“access-log”, productId, message)
if not ok then
ngx.log(ngx.ERR, “kafka send err:”, err)
return
end
5.strom从kafka队列消费消息,并统计出来每个商品的访问次数
strom设计到几个概念,spout-bolt-wroker-task-拓扑,基础版本的拓扑是这么来的,比如一个问题,实时统计单词出现次数
5.1spout不断模拟发送数据(句子),并转发给bolt
5.2 bolt收到数据,把数据拆成单词,并把每个单词都转发给下一个bolt
5.3 计数的blot收到单词后开始统计单词数量
5.4 拓扑结构的创建和main测试方法
那么,换到这个功能也是一样的,一个spout不断从kafka消费自定义的消息,表示有个商户请求过来,访问的是商品几的消息,并转发给下一个bolt
一个bolt解析这个消息,获取消息中的productid并转发给下一个bolt
一个bolt获取到productid,并且计数,将数据放到lrumap中,这个map如果你初始化为100,那么只会存储最近请求的100种商品,注意不是100次请求,map计数value是累加的
6.统计出前N名的商品id,然后分布式存储到zk中,意思也就是和taskid有关,你设置了拓扑结构中最多有2个task,
存储在zk中的taskidlist就会是5,4 ,每个task都会计算一个前N的排名,然后放到zk对应的task节点上
7从zookeeper中获取分布式锁的内容放到缓存生产服务中
这里主要是获取zk数据状态锁,taskid锁的双锁后往redis中插入数据。

转载请注明:SuperIT » nginx+lua+strom+kafka实现热点数据实时展示,实时统计热点数据展示出来

喜欢 (0)or分享 (0)