当前位置: 首页 > news >正文

个人单页网站微信 绑定网站

个人单页网站,微信 绑定网站,在线网站,有域名了怎么建设网站在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中#xff0c;我介绍了车联网平台需要实现的一些功能#xff0c;并介绍了如何用EMQXHAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息#xff0c;同时也会下发消息给车辆#xff0c;以实现车…在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中我介绍了车联网平台需要实现的一些功能并介绍了如何用EMQXHAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息同时也会下发消息给车辆以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存以供上层应用使用。我们可以直接把消息保存到数据库或者引入一个消息队列这样可以方便对应用和车辆之间进行解耦合。 这里我将介绍一下如何引入一个Kafka消息队列把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。 在EMQX的企业版中提供了丰富的数据桥接功能可以支持把MQTT消息桥接到其他外部系统例如Kafka或数据库中。但是在开源版只提供了很有限的数据桥接不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式来加载我们的插件实现把数据桥接到Kafak。 在EMQX官网的介绍中Hook扩展是通过gRPC的方式来实现的支持多种编程语言。如下图 这里我以Python为例子来定义一个扩展。 搭建Kafka 首先是在K8S上部署一个Kafka集群这里我选择了Strimizi的Kafka operator来部署 先创建一个namespace kubectl create namespace kafka 安装Operator, CRD以及定义RBAC等 kubectl create -f https://strimzi.io/install/latest?namespacekafka -n kafka 创建一个只包含一个节点的Kafka kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka 打开两个终端分别运行以下的订阅和发布的指令测试Kafka是否正常工作 kubectl -n kafka run kafka-producer -ti --imagequay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rmtrue --restartNever -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic kubectl -n kafka run kafka-consumer -ti --imagequay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rmtrue --restartNever -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning 开发ExHook 首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。 运行以下命令来基于这个proto生成python文件 python -m grpc_tools.protoc -I./ --python_out. --pyi_out. --grpc_python _out. ./exhook.proto 运行之后在当前目录下会新生成三个文件exhook_pb2_grpc.pyexhook_pb2.pyexhook_pb2.pyi 新建一个exhook_server.py文件继承exhook_pb2_grpc里面的HookProviderServicer注册对应事件的处理方法如以下代码 from concurrent import futures import logging from multiprocessing.sharedctypes import Valueimport grpcimport exhook_pb2 import exhook_pb2_grpcimport pickle from kafka import KafkaProducerclass HookProvider(exhook_pb2_grpc.HookProviderServicer):def __init__(self):self.producer KafkaProducer(bootstrap_serversmy-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092)def OnProviderLoaded(self, request, context):print(OnProviderLoaded:, request)specs [exhook_pb2.HookSpec(nameclient.connect),exhook_pb2.HookSpec(nameclient.connack),exhook_pb2.HookSpec(nameclient.connected),exhook_pb2.HookSpec(nameclient.disconnected),exhook_pb2.HookSpec(nameclient.authenticate),exhook_pb2.HookSpec(nameclient.authorize),exhook_pb2.HookSpec(nameclient.subscribe),exhook_pb2.HookSpec(nameclient.unsubscribe),exhook_pb2.HookSpec(namesession.created),exhook_pb2.HookSpec(namesession.subscribed),exhook_pb2.HookSpec(namesession.unsubscribed),exhook_pb2.HookSpec(namesession.resumed),exhook_pb2.HookSpec(namesession.discarded),exhook_pb2.HookSpec(namesession.takenover),exhook_pb2.HookSpec(namesession.terminated),exhook_pb2.HookSpec(namemessage.publish),exhook_pb2.HookSpec(namemessage.delivered),exhook_pb2.HookSpec(namemessage.acked),exhook_pb2.HookSpec(namemessage.dropped)]specs [exhook_pb2.HookSpec(namemessage.publish)]return exhook_pb2.LoadedResponse(hooksspecs)def OnProviderUnloaded(self, request, context):print(OnProviderUnloaded:, request)return exhook_pb2.EmptySuccess()def OnClientConnect(self, request, context):print(OnClientConnect:, request)return exhook_pb2.EmptySuccess()def OnClientConnack(self, request, context):print(OnClientConnack:, request)return exhook_pb2.EmptySuccess()def OnClientConnected(self, request, context):print(OnClientConnected:, request)return exhook_pb2.EmptySuccess()def OnClientDisconnected(self, request, context):print(OnClientDisconnected:, request)return exhook_pb2.EmptySuccess()def OnClientAuthenticate(self, request, context):print(OnClientAuthenticate:, request)reply exhook_pb2.ValuedResponse(typeSTOP_AND_RETURN, bool_resultTrue)return replydef OnClientAuthorize(self, request, context):print(OnClientAuthorize:, request)reply exhook_pb2.ValuedResponse(typeSTOP_AND_RETURN, bool_resultTrue)return replydef OnClientSubscribe(self, request, context):print(OnClientSubscribe:, request)return exhook_pb2.EmptySuccess()def OnClientUnsubscribe(self, request, context):print(OnClientUnsubscribe:, request)return exhook_pb2.EmptySuccess()def OnSessionCreated(self, request, context):print(OnSessionCreated:, request)return exhook_pb2.EmptySuccess()def OnSessionSubscribed(self, request, context):print(OnSessionSubscribed:, request)return exhook_pb2.EmptySuccess()def OnSessionUnsubscribed(self, request, context):print(OnSessionUnsubscribed:, request)return exhook_pb2.EmptySuccess()def OnSessionResumed(self, request, context):print(OnSessionResumed:, request)return exhook_pb2.EmptySuccess()def OnSessionDiscarded(self, request, context):print(OnSessionDiscarded:, request)return exhook_pb2.EmptySuccess()def OnSessionTakenover(self, request, context):print(OnSessionTakenover:, request)return exhook_pb2.EmptySuccess()def OnSessionTerminated(self, request, context):print(OnSessionTerminated:, request)return exhook_pb2.EmptySuccess()def OnMessagePublish(self, request, context):self.producer.send(testtopic, pickle.dumps(nmsg))reply exhook_pb2.ValuedResponse(typeSTOP_AND_RETURN, messagenmsg)return reply## case2: stop publish the t/d messages#def OnMessagePublish(self, request, context):# nmsg request.message# if nmsg.topic t/d:# nmsg.payload b# nmsg.headers[allow_publish] bfalse## reply exhook_pb2.ValuedResponse(typeSTOP_AND_RETURN, messagenmsg)# return replydef OnMessageDelivered(self, request, context):print(OnMessageDelivered:, request)return exhook_pb2.EmptySuccess()def OnMessageDropped(self, request, context):print(OnMessageDropped:, request)return exhook_pb2.EmptySuccess()def OnMessageAcked(self, request, context):print(OnMessageAcked:, request)return exhook_pb2.EmptySuccess()def serve():server grpc.server(futures.ThreadPoolExecutor(max_workers10))exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)server.add_insecure_port([::]:9000)server.start()print(Started gRPC server on [::]:9000)server.wait_for_termination()if __name__ __main__:logging.basicConfig()serve() 解释一下代码在OnProvidedLoader里面是加载各种事件的钩子这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数这里把收到的MQTT消息通过Pickle进行序列化发送到Kafka的对应topic 部署ExHook 写一个Dockerfile把代码打包为一个镜像 FROM python:3.7-slim WORKDIR /app COPY requirements.txt ./ RUN pip install -r requirements.txt COPY . . CMD [python, ./exhook_server.py] requirements.txt文件内容为 grpcio1.59.3 grpcio-tools1.59.3 kafka-python2.0.2 运行以下命令来构建镜像 docker build --networkhost -t emqx_plugin_test:v1.0 . 创建一个部署这个镜像的deployment和service然后部署到K8S apiVersion: apps/v1 kind: Deployment metadata:name: emqx-hookserver-deploymentlabels:app: hookservernamespace: emqx spec:replicas: 1selector:matchLabels:app: hookservertemplate:metadata:labels:app: hookserverspec:containers:- name: hookserverimage: emqx_plugin_test:v1.0imagePullPolicy: Neverresources:requests:memory: 250Micpu: 100mlimits:memory: 250Micpu: 100mports:- name: rpccontainerPort: 9000 --- apiVersion: v1 kind: Service metadata:name: hookserver-servicenamespace: emqx spec:selector:app: hookserverports:- name: rpcport: 9000 回到EMQX的控制面板Dashboard在ExHook里面添加url填入http://hookserver-service.emqx.svc.cluster.local:9000然后选择启用即可可以看到状态为连接成功并且显示注册了1个钩子。 在minikube上部署一开始是显示连接中等了很久仍然无法连接成功最后查了资料原来是coredns的问题运行以下命令重启即可 kubectl -n kube-system rollout restart deployment coredns 之后打开订阅Kafka的testtopic然后通过MQTT连接到EMQX发送消息可以看到Kafka能成功收到EMQX转发的消息。
http://wiki.neutronadmin.com/news/137988/

相关文章:

  • 做网站客户给不了素材win7 iis配置本地网站
  • 网站建设取得了购买空间网站哪个好
  • 建设工程质量协会网站俄罗斯网站建设
  • 做网站怎样实现网上支付怎么样做小程序
  • jsp网站建设作业dedecms 门户网站制作
  • 贵阳市乌当区住房与城乡建设局网站娃哈哈网络营销策划方案
  • wordpress企业网站开发视频网站后台
  • 威海高区有没有建设局的网站网站开发简答题
  • 做线上网站的风险分析网站设计 版权
  • 广东建设工程招标网站专业建设 验收 网站
  • 代理网站开发金山网站安全检测
  • php访问网站网站建设 广州佛山
  • 网站迁移后 后台进不去简单网站建设方案策划
  • 深圳积分商城网站制作怎样做类似淘宝的网站
  • 深圳建站软件红酒首页网页设计素材
  • 制作网站的软件有那个免费我的世界做头像的网站
  • 网站开发计入什么会计科目网络运营招聘
  • 机械技术支持 东莞网站建设制作网站和制作网页的分别
  • 定制公司网站百度指数人群画像哪里查询
  • 建服务网站需要多少钱遵义软件制作平台
  • 旅游信息网站开发背景陕西网站建设优化建站
  • 北京市建设厅门户网站wordpress回复邮件
  • 画品展现手机网站android wap网站
  • 建设通网站上的业绩能否有用wordpress钩子自定义钩子
  • 下载建设银行官方网站下载免费发布信息
  • 百度收录好的网站排名专业开发软件的公司
  • 大连坐做网站公司品牌宣传片策划公司
  • 页游网站网页设计作品集展示
  • 小店网站制作wordpress 云
  • 东莞网站关键词郑州最新消息今天