彩票型网站建设,游戏动画设计师需要学什么,问卷调查,中国上海官网1. 环境准备
1.1 flink环境准备
关于如何安装flink#xff0c;这个写的非常详细#xff0c;https://blog.csdn.net/qq_43699958/article/details/132826440 在flink的bin目录启动flink cluster
[rootlocalhost bin]# ./start-cluster.sh1.2 Linux环境准备
1.2.1 关闭linu…1. 环境准备
1.1 flink环境准备
关于如何安装flink这个写的非常详细https://blog.csdn.net/qq_43699958/article/details/132826440 在flink的bin目录启动flink cluster
[rootlocalhost bin]# ./start-cluster.sh1.2 Linux环境准备
1.2.1 关闭linux防火墙
会用到的命令如下 a. 查看防火墙状态firewall-cmd --state 如果是not running状态说明没有启动防火墙 b. 关闭防火墙systemctl stop firewalld.service c. 设置开机禁启systemctl disable firewalld.service
[rootlocalhost bin]# firewall-cmd --state
not running
[rootlocalhost bin]# systemctl stop firewalld.service
[rootlocalhost bin]# systemctl disable firewalld.service
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
[rootlocalhost bin]#1.2.2 hosts文件准备
因为下文的java代码中代码段socketTextStream(“hadoop01”, 8888)涉及到了host名称所以需要将linux环境中的/ect/hosts修改一下 文件中添加上这么一行
192.168.126.223 hadoop011.3 nc准备
启动一个数据源用到命令为
[rootlocalhost ~]# nc -lk 8888上面命令的意思是 -k, --keep-open Accept multiple connections in listen mode -l, --listen Bind and listen for incoming connections
1.4 jar包准备
代码如下打包成一个jar包即可
package com.atguigu.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** author Amos* date 2023/9/11*/public class WordCountStreamUnboundedDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString socketDS env.socketTextStream(hadoop01, 8888);SingleOutputStreamOperatorTuple2String, Integer sum socketDS.flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value - value.f0).sum(1);sum.print();env.execute();}
}2. jar包提交
2.1 首先打开nc
[rootlocalhost ~]# nc -lk 88882.2 在flink环境提交jar包
为了方便直接将jar包放到了bin目录中名称为上面java代码的工程名称 MyFlinkTutorial-117-1.0-SNAPSHOT.jar
[rootlocalhost bin]# pwd
/root/flink-1.17.1/bin
[rootlocalhost bin]# ll
total 2364
-rw-r--r--. 1 501 games 2290658 May 19 07:13 bash-java-utils.jar
-rwxr-xr-x. 1 501 games 22811 May 19 04:43 config.sh
-rwxr-xr-x. 1 501 games 1318 May 19 04:43 find-flink-home.sh
-rwxr-xr-x. 1 501 games 2381 May 19 04:43 flink
-rwxr-xr-x. 1 501 games 4446 May 19 04:43 flink-console.sh
-rwxr-xr-x. 1 501 games 6783 May 19 04:43 flink-daemon.sh
-rwxr-xr-x. 1 501 games 1564 May 19 04:43 historyserver.sh
-rwxr-xr-x. 1 501 games 2498 May 19 04:43 jobmanager.sh
-rwxr-xr-x. 1 501 games 1650 May 19 04:43 kubernetes-jobmanager.sh
-rwxr-xr-x. 1 501 games 1717 May 19 04:43 kubernetes-session.sh
-rwxr-xr-x. 1 501 games 1770 May 19 04:43 kubernetes-taskmanager.sh
-rw-r--r--. 1 root root 9055 Sep 12 03:41 MyFlinkTutorial-117-1.0-SNAPSHOT.jar
-rwxr-xr-x. 1 501 games 2994 May 19 04:43 pyflink-shell.sh
-rwxr-xr-x. 1 501 games 4051 May 19 04:44 sql-client.sh
-rwxr-xr-x. 1 501 games 3299 May 19 04:44 sql-gateway.sh
-rwxr-xr-x. 1 501 games 2006 May 19 04:43 standalone-job.sh
-rwxr-xr-x. 1 501 games 1837 May 19 04:43 start-cluster.sh
-rwxr-xr-x. 1 501 games 1854 May 19 04:43 start-zookeeper-quorum.sh
-rwxr-xr-x. 1 501 games 1617 May 19 04:43 stop-cluster.sh
-rwxr-xr-x. 1 501 games 1845 May 19 04:43 stop-zookeeper-quorum.sh
-rwxr-xr-x. 1 501 games 2960 May 19 04:43 taskmanager.sh
-rwxr-xr-x. 1 501 games 1725 May 19 04:43 yarn-session.sh
-rwxr-xr-x. 1 501 games 2405 May 19 04:43 zookeeper.sh
[rootlocalhost bin]#
因为1.1已经启动了flink环境并且2.1中启动了nc所以这里直接提交jar包 命令为
./flink run -c com.atguigu.wc.WordCountStreamUnboundedDemo MyFlinkTutorial-117-1.0-SNAPSHOT.jar命令中-c 指定了包的入口类为com.atguigu.wc.WordCountStreamUnboundedDemo 后面接上jar名称即可 然后会生成一个jobid
[rootlocalhost bin]# ./flink run -c com.atguigu.wc.WordCountStreamUnboundedDemo MyFlinkTutorial-117-1.0-SNAPSHOT.jar
Job has been submitted with JobID 28526936241a9f2486c15c9ddb7faa64 job正常提交和运行具体如何配置webui可以看这个文章也是我写的所以很有连贯性https://blog.csdn.net/qq_43699958/article/details/132826440
nc中输入单词测试
[rootlocalhost ~]# nc -lk 8888
google flink
haerbin
tangshan
hangzhou
kunming
留下青春的都
jiayou nikeyi 搞定