给公司做宣传网站的好处,报电子商务(网站建设与运营),网络怎么推广,广州市网站建设服务机构组内做大数据#xff0c;需要kafka写入数据#xff0c;最近在看python正好#xff0c;练练手#xff0c;网上找了一圈#xff0c;都是用的pykafka#xff0c;经过一整圈的安装#xff0c;最终搞定#xff0c;代码如下#coding:u8import sysimport timeimport randomimpo… 组内做大数据需要kafka写入数据最近在看python正好练练手网上找了一圈都是用的pykafka经过一整圈的安装最终搞定代码如下#coding:u8import sysimport timeimport randomimport datetimeimport MySQLdbimport codecsfrom pykafka import KafkaClientimport loggingimport jsonimport threading******************ad[]try:inifile(set.txt)adini.readline().splitlines()ini.closeexcept Exception as e:print open settings file Error:,type(e)ad[192.168.1.121:9092]print open ini filetry:client KafkaClient(hosts ad[0])print Topics:,client.topicstopic client.topics[mytopic]except Exception as e:print Opening kafka Error:%s %(type(e))sys.exit(1)print before threadingtry:with tp.get_sync_producer() as producer:producer.produce(str(dct2))except Exception as e:print Error: ,type(e)print ini consumerwhile 11:print nn,type(consumer)for message in consumer:print mmif message is not None:print message.offset, message.valueexcept Exception as e:print e,type(e)运行结果可以列出topic写入的数据也没有报错信息。但是消费者取不到数据无论是kafka直接取还是python写消费者代码。后来采用了 kafkapython 正常代码如下#coding:utf-8import sysimport timeimport randomimport datetimeimport codecsimport kafka.kafkaProducerimport loggingimport jsonimport threadingad[]try:inifile(set.txt)adini.readline().splitlines()ini.closeexcept Exception as e:ad[192.168.1.121:9092,192.168.1.122.9092]#print open settings file Error:%d,%s %(e.args[0],e.args[1])print Opening settings file Error:,e,type(e)print Opened ini filetry:client KafkaClient(hosts ad[0])print Topics:,client.topicstopic client.topics[mytopic]except Exception as e:print Opening kafka Error:%s %(e.args[0])sys.exit(1)print before threadingtry:producer KafkaProducer(bootstrap_serversad[0], value_serializerlambda m: json.dumps(m).encode(utf-8))except Exception as e:print Opening kafka Error:,e,type(e)sys.exit(1)print before threadingthreads[]for i in range(0,12):try:threads.append(threading.Thread(targettf,args(producer,i)))threads[i].start()except Exception as e:print Treand error at Thread:%d:%s,%s %(i,e,type(e))print main thread is ended代码均有所节略。