赣州市建设工程质量监督管理站网站,360推广,cms做的网站胡源代码,个人网站站长Kafka适合什么样的场景?
它可以用于两大类别的应用:
构造实时流数据管道#xff0c;它可以在系统或应用之间可靠地获取数据。 (相当于message queue)构建实时流式应用程序#xff0c;对这些流数据进行转换或者影响。 (就是流处理#xff0c;通过kafka stream topic和topi…Kafka适合什么样的场景?
它可以用于两大类别的应用:
构造实时流数据管道它可以在系统或应用之间可靠地获取数据。 (相当于message queue)构建实时流式应用程序对这些流数据进行转换或者影响。 (就是流处理通过kafka stream topic和topic之间内部进行变化)
Kafka中文文档http://kafka.apachecn.org/
1系统环境
a操作系统 CentOS Linux release 7.6.1810 (Core) 64位必须确保你的内存是4G以上双核CPU否则将无法新建默认命名空间。
b确保jdk环境已经安装具体教程请看 CentOS7 shell脚本安装jdk
c确保Python3和对应的pip已经安装具体教程请看 CentOS7 源码编译安装Python3.5
2执行以下命令安装Kafka并启动
wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz # 下载kafka 1.0.0安装包
tar -zxvf kafka_2.11-1.0.0.tgz # 解压安装包
cd kafka_2.11-1.0.0/ # 打开kafka目录
sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 后台运行zookeeper
sh bin/kafka-server-start.sh config/server.properties # 运行kafka服务出现 “started” 则是启动成功 3执行以下命令创建test5的topic
sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test5 4执行以下命令创建监听test5的消息队列程序
sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning
5执行以下命令建发送test5消息队列的生产者程序
sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5
生产者发送一些消息回车 看到消费者界面出现生产者的消息 6Python3安装kafka依赖包执行命令 “pip3 install kafka”
依赖包安装完成后创建python3消费者监听程序kafka_consumer.py
from kafka import KafkaConsumer
consumer KafkaConsumer(test4, bootstrap_servers[localhost:9092])
for msg in consumer:recv %s:%d:%d: key%s value%s % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)
运行 python3 kafka_consumer.py 创建python3生产消息队列程序kafka_producer.py
import json
from kafka import KafkaProducer
producer KafkaProducer(bootstrap_serverslocalhost:9092)
msg_dict {sleep_time: 10,db_config: {database: test_1,host: xxxx,user: root,password: root},table: msg,msg: Hello World
}
msg json.dumps(msg_dict)
producer.send(test4, bytes(msg,ascii), partition0)
producer.close()
运行 “python3 kafka_producer.py”转到kafka_consumer.py运行界面看到已接收到生产程序发送过来的信息