当前位置: 首页 > news >正文

瑞丽网站建设代理做减肥网站

瑞丽网站建设,代理做减肥网站,软件开发的就业前景,中国免费建设网站网址1.摘要 在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。 在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafk…1.摘要 在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。 在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。 2.功能结构组织 为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为: kafka 目录)|----- consumer.go (消费者方法实现)|----- producer.go (生产者方法实现)|----- kafka.go (定义接口)|----- kafka_test.go (单元功能测试) 为方便项目使用,在此基础上做了二次封装。 3.消费者实现 第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息: type KafkaConsumer struct {Hosts string // Kafka主机IP:端口,例如:192.168.201.206:9092Ctopic string // topic名称Kchan chan string // 接收信息通道Consumer sarama.Consumer // 消费者对象 } 接下来是消费者初始化函数: func (k *KafkaConsumer) kafkaInit() {// 定义配置选项 config : sarama.NewConfig()config.Consumer.Return.Errors trueconfig.Version sarama.V0_10_2_0// 初始化一个消费对象consumer, err : sarama.NewConsumer(k.Hosts, config)if err ! nil {err errors.New(NewConsumer错误,原因: err.Error())fmt.Println(err.Error())return}// 获取所有Topictopics, err : consumer.Topics()if err ! nil {fmt.Println(err.Error())return}// 判断是否有自定义的Topicvar topicsName for _, e : range topics {if e k.Ctopic {topicsName ebreak}}// 没有自定义的Topic则报错if topicsName {err errors.New(找不到topics内容)fmt.Println(err.Error())return}// 将消费对象保存到结构体以备后面使用k.Consumer consumer } 在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。 接下来是消费监控函数实现,代码如下: func (k *KafkaConsumer) kafkaProcess() {var wg sync.WaitGroup// 遍历指定Topic分区持续监控消息Partitions, _ : k.Consumer.Partitions(k.Ctopic)for _, subPartitions : range Partitions {pc, err : k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)if err ! nil {continue}wg.Add(1)go func() {defer wg.Done()// 这里进入另一个函数可以过滤消息内容k.processPartition(pc)}()}wg.Wait() } 函数processPartition()的实现代码如下: func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {defer pc.AsyncClose()for msg : range pc.Messages() {// 这里可以过滤不需要的Topic的信息if strings.Contains(string(msg.Value), group_state2) {continue}// 这里将获取到的Topic信息发送到通道k.Kchan - string(msg.Value)} } 4.生产者实现 为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。 定义生产者的结构体如下: type KafkaProducer struct {hosts string // Kafka主机sendmsg string // 消费方返回给生产方的消息ptopic string // TopicAsyncProducer sarama.AsyncProducer // Kafka生产者接口对象 } 对应的生产者初始化函数实现如下: func (k *KafkaProducer) kafkaInit() {// 定义配置参数config : sarama.NewConfig()config.Producer.RequiredAcks sarama.WaitForAllconfig.Producer.Retry.Max 5config.Producer.Return.Successes trueconfig.Version sarama.V0_10_2_0// 初始化一个生产者对象producer, err : sarama.NewAsyncProducer(k.hosts, config)if err ! nil {err errors.New(NewAsyncProducer错误,原因: err.Error())fmt.Println(err.Error())return}// 保存对象到结构体k.AsyncProducer producer } 给生产者回复信息的函数实现如下: func (k *KafkaProducer) kafkaProcess() {msg : sarama.ProducerMessage{Topic: k.ptopic,}// 信息编码msg.Value sarama.ByteEncoder(k.sendmsg)// 将信息发送给通道k.AsyncProducer.Input() - msg } 5.接口定义实现 首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下: // Kafka方法接口 type IKafkaMethod interface {kafkaInit() // 初始化方法kafkaProcess() // 执行方法 } 为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数: // 接口管理结构体 type KafkaManager struct {kafkaMethod IKafkaMethod // 接口对象 }// 定义实现Set方法 func (km *KafkaManager) Set(m IKafkaMethod) {km.kafkaMethod m // 将指定的方法赋给接口 }// 定义实现Run方法 func (km *KafkaManager) Run() {km.kafkaMethod.kafkaInit()go km.kafkaMethod.kafkaProcess() } 最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针: type KafkaMessager struct {KafkaManager *KafkaManager // 接口管理对象指针KafkaProducer *KafkaProducer // 生产者对象指针KafkaConsumer *KafkaConsumer // 消费者对象指针Hosts string // Kafka主机topic string // topic }// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量 func NewKafkaMessager(hosts, topic string) *KafkaMessager {km : KafkaMessager{KafkaManager: new(KafkaManager),KafkaProducer: new(KafkaProducer),KafkaConsumer: new(KafkaConsumer),Hosts: hosts,topic: topic,}return km } 6.功能调用和验证 在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下: func TestKafka(t *testing.T) {.... } 使用单元测试函数的好处是可以单独调试, 专注核心功能本身。 我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图: 下面是我模拟用户调用的客户端代码片段: // 这里选择我自己搭建的Kafka所在服务器,Topic为test123 // 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092 hosts : 192.168.201.206:9092 topic : test123// 调用初始化函数,并将上面的内容作为参数传进去 nkm : NewKafkaMessager(hosts, topic)// 初始化消费者,当生产者发出消息,消费者自动消费 nkm.KafkaConsumer.Hosts hosts // 消费者host赋值 nkm.KafkaConsumer.Ctopic topic // 消费者topic赋值 nkm.KafkaConsumer.Kchan make(chan string) // 初始化消息通道 nkm.KafkaManager.Set(nkm.KafkaConsumer) // 接口赋值,设置成操作消费者方法 nkm.KafkaManager.Run() // 执行消费者初始化方法// 监听通道,接收生产客户端发过来的消息 recv : - nkm.KafkaConsumer.Kchan fmt.Println(recv) // 打印接收到的消息 现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送: 可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。 --- END --
http://wiki.neutronadmin.com/news/49824/

相关文章:

  • 银川网站建设广告公司wordpress 安装ssl
  • 打造自己的网站南川区 网站集约化建设方案
  • 学校网站空间建设情况wordpress直播购物插件
  • 深圳做网站(信科网络)网站友情链接模板
  • 如何设置中国建设银行网站外贸平台有哪些国际
  • 商业网站建设设计郑州网站设计收费
  • 合肥公司网站建设价格上海做网站哪家公司好
  • 学校网站建设文字规范问题工商注册地址有什么要求
  • 网站权重分为几个等级网站建设的组织机构
  • 手机网站自适应分辨率福州seo顾问
  • 云主机建设网站网站开发简单
  • 站长工具seo综合查询是什么意思网站开发定制案例展示
  • 衡阳做网站的公司网站做轮播图的意义
  • 网站设计手机闵行区属于浦东还是浦西
  • 网站建设管理工作简述wordpress常常被用来做什么网站
  • 企业网站建设太原网站建设wordpress做菜鸟教程
  • 专业建站公司的业务内容有哪些泉州网站建设公司首选公司
  • 网站现状如何分析平台推广怎么写
  • 上蔡做网站毕设做网站什么主题比较好
  • 财经网站源码 织梦网站开发 鲁山
  • 牛杂网这类网站怎么做的数商云网络科技有限公司
  • 张家界简单的网站建设二级医院做网站
  • 网站的动画广告横幅怎么做的优化网站
  • 帮忙做任务网站分销商城系统有哪些
  • 网站建设项目报价单赣州91人才网赣州招聘
  • 第一次做网站选多大空间长沙建网站培训
  • 博达网站建设教程分销商管理系统
  • 济宁网站建设推荐广州哪里能看海
  • 如何做网站的二级页面c .net 做网站
  • WordPress电影资源分享下载站外贸网络营销定价策略