新建maven程序
pom.xml依赖如下:
然后新建一个TestSocketWindowWordCount类具体代码如下
然后启动flink集群->新建一个监听:nc -lk 6666
然后启动TestSocketWindowWordCount类
在linux监听页面输入代码
观察在idea控制台就有统计的输出
-------pom.xml开始-----------
-------pom.xml结束-----------
-------TestSocketWindowWordCount开始------------------
package com.gyb;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
创新互联公司服务项目包括石柱土家族网站建设、石柱土家族网站制作、石柱土家族网页制作以及石柱土家族网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,石柱土家族网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到石柱土家族省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
import javax.xml.soap.Text;
public class TestSocketWindowWordCount {
public static void main(String args[]) {
String hostname = "192.168.198.130";
int port = 6666;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.socketTextStream(hostname, port, "\n");//获取执行环境
SingleOutputStreamOperator windowCounts = text
.flatMap(new FlatMapFunction
br/>@Override
out) {
for (String word : value.split("\s")) {
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(5))
.reduce(new ReduceFunction
br/>@Override
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
//env.execute("Socket Window WordCount");
try {
env.execute("Socket Window WordCount");
} catch (Exception e) {
e.printStackTrace();
}
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
-------TestSocketWindowWordCount结束------------------