Flink算子

==作者:YB-Chi==

[toc]

数据流转换

准备的源算子

1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(new Event("mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Bob", "./cart", 2100L),
new Event("mary", "./home", 1100L),
new Event("Bob", "./cart", 2200L),
new Event("Bob", "./cart", 2300L),
new Event("Bob", "./cart", 2400L),
new Event("Alice", "./port", 3000L));

注意:每个算子代码块1为官方提供,代码块2为结合源算子的自编样例.

Map

DataStream → DataStream

输入一个元素同时输出一个元素。

下面是将输入流中元素数值加倍的 map function:

1
2
DataStream<Integer> dataStream = //...
dataStream.map(num -> num*2);

下面是将输入流中元素取一个字段的 map function:

1
2
3
SingleOutputStreamOperator<String> res = stream.map(d -> d.user);
res.print();
env.execute();

Filter

DataStream → DataStream

为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。

下面是过滤掉零值的 filter:

1
dataStream.filter(num -> num!=0);

下面是过滤掉名称不叫Alice的 filter:

1
stream.filter(d -> "Alice".equals(d.user));

FlatMap

DataStream → DataStream

输入一个元素同时产生零个、一个或多个元素。

下面是将句子拆分为单词的 flatmap function:

1
2
3
4
5
stream.flatMap((String value, Collector<String> out) -> {
for(String word: value.split(" ")){
out.collect(word);
}
});

下面是将event按人名收集不同信息的 flatmap function:

1
2
3
4
5
6
7
8
stream.flatMap((Event d, Collector<String> out) -> {
if (d.user.equals("mary"))
out.collect(d.url);
else if (d.user.equals("Bob")) {
out.collect(d.user);
out.collect(d.url);
}
}).returns(new TypeHint<String>(){});

KeyBy

DataStream → KeyedStream

在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

1
2
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

以下情况,一个类不能作为 key

  1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  2. 它是任意类的数组。

聚合样例

1
2
3
stream.keyBy( event -> event.user).max("timestamp").print("max");
stream.keyBy( event -> event.user).maxBy("timestamp").print("maxBy");
env.execute();

Reduce

KeyedStream → DataStream

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

下面是创建局部求和流的 reduce function:

1
2
3
4
5
6
7
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});

下面是统计每个用户的访问频次的 reduce function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1.统计每个用户的点击次数
SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.user, 1L);
}
}).keyBy(data -> data.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
// 2.选取最活跃的用户
SingleOutputStreamOperator<Tuple2<String, Long>> res = clicksByUser.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) {
return value1.f1 > value2.f1 ? value1 : value2;
}
});
文章作者: CYBSKY
文章链接: https://cybsky.top/2022/09/23/cyb-mds/bigdata/Flink/Flink算子/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 CYBSKY