明年做哪个网站致富,贵阳网站备案在哪里,wordpress 抽奖插件,做网站不给源代码多的不说#xff0c;先来代码分析#xff0c;再贴我自己写的代码。如果代码有错误#xff0c;求更正。。导入两个关键包#xff0c;其他项目需要的包#xff0c;大家自己导入了#xff0c;我pom下的包太多#xff0c;不好一下扔上来。org.apache.stormstorm-redis${stor…多的不说先来代码分析再贴我自己写的代码。如果代码有错误求更正。。导入两个关键包其他项目需要的包大家自己导入了我pom下的包太多不好一下扔上来。org.apache.stormstorm-redis${storm.version}redis.clientsjedis2.9.0我是连接的linux上的redis所以要对redis进行配置不然会出现拒绝连接的错误。redis部署在linux时java远程连接需要修改配置:修改redis.conf文件1.将bind 127.0.0.1加上注释(#bind 127.0.0.1)允许出本机外的IP访问redis2.将protected-mode yes修改为protected-mode no不保护redis3.将daemonize no修改为daemonize yes允许redis服务后台运行修改防火墙端口号1.将redis默认的6379注册到防火墙中/sbin/iptables -I INPUT -p tcp –dport 6379 -j ACCEPT2.保存防火墙端口号表/etc/rc.d/init.d/iptables save3.重启防火墙/etc/rc.d/init.d/iptables restart4.查看防火墙状态/etc/rc.d/init.d/iptables status使用测试类连接下看能不能连同import java.util.Iterator;import java.util.Set;import redis.clients.jedis.Jedis;/*** author cwc* date 2018年5月30日* description:* version 1.0.0*/public class RedisTest {public static void main(String[]args){//连接本地的 Redis 服务Jedis jedis new Jedis(xxx.xx.xxx.xx);System.out.println(连接成功);//查看服务是否运行System.out.println(服务正在运行: jedis.ping());// 获取数据并输出Set keys jedis.keys(*);Iterator itkeys.iterator() ;while(it.hasNext()){String key it.next();System.out.println(key);}}}准备就绪先说说storm向redis写入官方给的写入APIclass WordCountStoreMapper implements RedisStoreMapper {private RedisDataTypeDescription description;private final String hashKey wordCount;public WordCountStoreMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple tuple) {return tuple.getStringByField(count);}}//这里是用来new 一个新的bolt在TopologyBuilder时调用操作JedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisStoreMapper storeMapper new WordCountStoreMapper();RedisStoreBolt storeBolt new RedisStoreBolt(poolConfig, storeMapper);我反正刚刚看的时候一脸懵逼之后研究了很久才明白下面贴我自己的代码import java.util.HashMap;import java.util.Map;import java.util.Random;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;/*** author cwc* date 2018年5月29日* description:这是给的假的数据源* version 1.0.0*/public class RedisWriteSpout extends BaseRichSpout {private static final long serialVersionUID 1L;private SpoutOutputCollector spoutOutputCollector;/*** 作为字段word输出*/private static final Map LASTNAME new HashMap();static {LASTNAME.put(0, anderson);LASTNAME.put(1, watson);LASTNAME.put(2, ponting);LASTNAME.put(3, dravid);LASTNAME.put(4, lara);}/*** 作为字段myValues输出*/private static final Map COMPANYNAME new HashMap();static {COMPANYNAME.put(0, abc);COMPANYNAME.put(1, dfg);COMPANYNAME.put(2, pqr);COMPANYNAME.put(3, ecd);COMPANYNAME.put(4, awe);}public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector spoutOutputCollector;}public void nextTuple() {final Random rand new Random();int randomNumber rand.nextInt(5);try {Thread.sleep(100);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));System.out.println(数据来袭);}public void declareOutputFields(OutputFieldsDeclarer declarer) {// emit the field site.declarer.declare(new Fields(word,myValues));}}import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;import org.apache.storm.redis.common.mapper.RedisStoreMapper;import org.apache.storm.tuple.ITuple;/*** author cwc* date 2018年5月30日* description:* version 1.0.0*/public class RedisWriteMapper implements RedisStoreMapper{private static final long serialVersionUID 1L;private RedisDataTypeDescription description;//这里的key是redis中的keyprivate final String hashKey mykey;public RedisWriteMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic String getKeyFromTuple(ITuple ituple) {//这个代表redis中hash中的字段名return ituple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple ituple) {//这个代表redis中hash中的字段名对应的值return ituple.getStringByField(myValues);}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}}storm读取redis数据官方给的APIclass WordCountRedisLookupMapper implements RedisLookupMapper {private RedisDataTypeDescription description;private final String hashKey wordCount;public WordCountRedisLookupMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic List toTuple(ITuple input, Object value) {String member getKeyFromTuple(input);List values Lists.newArrayList();values.add(new Values(member, value));return values;}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(wordName, count));}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple tuple) {return null;}}JedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisLookupMapper lookupMapper new WordCountRedisLookupMapper();RedisLookupBolt lookupBolt new RedisLookupBolt(poolConfig, lookupMapper);自己代码import java.util.HashMap;import java.util.Map;import java.util.Random;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;/*** author cwc* date 2018年5月30日* description:* version 1.0.0*/public class RedisReadSpout extends BaseRichSpout {private static final long serialVersionUID 1L;private SpoutOutputCollector spoutOutputCollector;/*** 这是刚刚作为word写入的数据要通过他获取我们存的值*/private static final Map LASTNAME new HashMap();static {LASTNAME.put(0, anderson);LASTNAME.put(1, watson);LASTNAME.put(2, ponting);LASTNAME.put(3, dravid);LASTNAME.put(4, lara);}public void open(Map conf, TopologyContext context,SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector spoutOutputCollector;}public void nextTuple() {final Random rand new Random();int randomNumber rand.nextInt(5);try {Thread.sleep(100);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber)));System.out.println(读数据来袭);}public void declareOutputFields(OutputFieldsDeclarer declarer) {// emit the field site.declarer.declare(new Fields(word));}}import java.util.List;import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;import org.apache.storm.redis.common.mapper.RedisLookupMapper;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.ITuple;import org.apache.storm.tuple.Values;import com.google.common.collect.Lists;/*** author cwc* date 2018年5月30日* description:* version 1.0.0*/public class RedisReadMapper implements RedisLookupMapper {private static final long serialVersionUID 1L;//对redis的所支持的种类进行了初始化private RedisDataTypeDescription description;//你想要读取的hash表中的key,这里使用的是刚刚存储的key字段名private final String hashKeymykey;/*** redis中储存结构为hash hashKey为根key 然后在通过getKeyFromTuple 获得的key找到相对于的value* key1-key2[]-value key2中的每一个key对应一个value* lookupValue jedisCommand.hget(additionalKey, key);*/public RedisReadMapper() {description new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}Overridepublic String getKeyFromTuple(ITuple tuple) {//获取传过来的字段名return tuple.getStringByField(word);}Overridepublic String getValueFromTuple(ITuple tuple) {return null;}Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {//从redis中hash通过上面的key下面找到制定的word中的字段名下的值有点想hbase中rowcfval一样declarer.declare(new Fields(word,values));}Override/*** 将拿到的数据装进集合并且返回*/public List toTuple(ITuple input, Object value) {String member getKeyFromTuple(input);List values Lists.newArrayList();//将拿到的数据存进集合,下面时将两个值返回的所以向下游传值时需要定义两个名字。values.add(new Values(member,value));return values;}}import java.util.Map;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;/*** author cwc* date 2018年5月30日* description:打印获取的数据* version 1.0.0*/public class RedisOutBolt extends BaseRichBolt{private OutputCollector collector;Overridepublic void execute(Tuple tuple) {//String str tuple.getString(0);String strs tuple.getString(1);System.out.println(strs);}Overridepublic void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {// TODO Auto-generated method stubthis.collectorcollector;}Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(RedisOutBolt));}}接下来是 RedisMain测试读写方法import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.redis.bolt.RedisLookupBolt;import org.apache.storm.redis.bolt.RedisStoreBolt;import org.apache.storm.redis.common.config.JedisPoolConfig;import org.apache.storm.redis.common.mapper.RedisLookupMapper;import org.apache.storm.redis.common.mapper.RedisStoreMapper;import org.apache.storm.topology.TopologyBuilder;public class RedisMain {public static void main(String[] args) throws Exception {//writeRedis();readRedis();}/*** 写redis*/public static void writeRedis(){JedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(xxx.xx.xx.xx).setPort(6379).build();System.out.println(连接成功);RedisStoreMapper storeMapper new RedisWriteMapper();RedisStoreBolt storeBolt new RedisStoreBolt(poolConfig, storeMapper);TopologyBuilder builder new TopologyBuilder();builder.setSpout(RedisWriteSpout, new RedisWriteSpout(), 2);builder.setBolt(to-save, storeBolt, 1).shuffleGrouping(RedisWriteSpout);Config conf new Config();LocalCluster cluster new LocalCluster();cluster.submitTopology(test, conf, builder.createTopology());System.err.println(写入完成!!!!!);try {Thread.sleep(10000);//等待6s之后关闭集群cluster.killTopology(test);//关闭集群cluster.shutdown();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 读redis*/public static void readRedis(){JedisPoolConfig poolConfig new JedisPoolConfig.Builder().setHost(xxx.xx.xxx.xx).setPort(6379).build();RedisLookupMapper lookupMapper new RedisReadMapper();RedisLookupBolt lookupBolt new RedisLookupBolt(poolConfig, lookupMapper);TopologyBuilder builder new TopologyBuilder();builder.setSpout(RedisReadSpout-reader, new RedisReadSpout(), 2);builder.setBolt(to-lookupBolt, lookupBolt, 1).shuffleGrouping(RedisReadSpout-reader);builder.setBolt(to-out,new RedisOutBolt(), 1).shuffleGrouping(to-lookupBolt);Config conf new Config();LocalCluster cluster new LocalCluster();cluster.submitTopology(test, conf, builder.createTopology());try {Thread.sleep(100000);//等待6s之后关闭集群cluster.killTopology(test);//关闭集群cluster.shutdown();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} 很多解释都写在了代码注解中其中也有很多问题在代码注释的地方放生的认真看下代码祝大家零BUG哦~~