==作者:YB-Chi==
[toc]
数据流转换
准备的源算子
1 2 3 4 5 6 7 8 9
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); DataStreamSource<Event> stream = env.fromElements(new Event("mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Bob", "./cart", 2100L), new Event("mary", "./home", 1100L), new Event("Bob", "./cart", 2200L), new Event("Bob", "./cart", 2300L), new Event("Bob", "./cart", 2400L), new Event("Alice", "./port", 3000L));
|
注意:每个算子代码块1为官方提供,代码块2为结合源算子的自编样例.
Map
DataStream → DataStream
输入一个元素同时输出一个元素。
下面是将输入流中元素数值加倍的 map function:
1 2
| DataStream<Integer> dataStream = dataStream.map(num -> num*2);
|
下面是将输入流中元素取一个字段的 map function:
1 2 3
| SingleOutputStreamOperator<String> res = stream.map(d -> d.user); res.print(); env.execute();
|
Filter
DataStream → DataStream
为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。
下面是过滤掉零值的 filter:
1
| dataStream.filter(num -> num!=0);
|
下面是过滤掉名称不叫Alice的 filter:
1
| stream.filter(d -> "Alice".equals(d.user));
|
FlatMap
DataStream → DataStream
输入一个元素同时产生零个、一个或多个元素。
下面是将句子拆分为单词的 flatmap function:
1 2 3 4 5
| stream.flatMap((String value, Collector<String> out) -> { for(String word: value.split(" ")){ out.collect(word); } });
|
下面是将event按人名收集不同信息的 flatmap function:
1 2 3 4 5 6 7 8
| stream.flatMap((Event d, Collector<String> out) -> { if (d.user.equals("mary")) out.collect(d.url); else if (d.user.equals("Bob")) { out.collect(d.user); out.collect(d.url); } }).returns(new TypeHint<String>(){});
|
KeyBy
DataStream → KeyedStream
在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。
1 2
| dataStream.keyBy(value -> value.getSomeKey()); dataStream.keyBy(value -> value.f0);
|
以下情况,一个类不能作为 key:
- 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
- 它是任意类的数组。
聚合样例
1 2 3
| stream.keyBy( event -> event.user).max("timestamp").print("max"); stream.keyBy( event -> event.user).maxBy("timestamp").print("maxBy"); env.execute();
|
Reduce
KeyedStream → DataStream
在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。
下面是创建局部求和流的 reduce function:
1 2 3 4 5 6 7
| keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
|
下面是统计每个用户的访问频次的 reduce function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event event) throws Exception { return Tuple2.of(event.user, 1L); } }).keyBy(data -> data.f0) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } });
SingleOutputStreamOperator<Tuple2<String, Long>> res = clicksByUser.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) { return value1.f1 > value2.f1 ? value1 : value2; } });
|