iis默认网站删除,西安专业房产网站建设,微信小程序开发一般多少钱,网站建设方法:背景
在使用flink进行集成测试时#xff0c;我们会使用MiniClusterWithClientResource类#xff0c;但是当我们断点导致在某个方法执行的时间比较长时#xff0c;会有错误发生#xff0c;那么该如何解决这个错误呢#xff1f;
处理concurrent.TimeoutException: Heartbe…背景
在使用flink进行集成测试时我们会使用MiniClusterWithClientResource类但是当我们断点导致在某个方法执行的时间比较长时会有错误发生那么该如何解决这个错误呢
处理concurrent.TimeoutException: Heartbeat of TaskManager with id错误
其实关键的配置是heartbeat.timeout这个错误是JobManager抛出的意思是和某个TaskManager的心跳中断超过了指定的时间我们把这个参数配置到MiniClusterWithClientResource类中就可以了代码如下所示
public class FlinkIntegrationTest {public static final Configuration config Configuration.fromMap(new HashMapString, String() {{put(heartbeat.timeout, 300000);}});ClassRulepublic static MiniClusterWithClientResource flinkCluster new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(world, hi).keyBy(e - 1).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList(hello world, hello hi world)));}Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(world, hi, world).keyBy(e - e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList(hello world, hello hi, hello world world)));}// create a testing sinkprivate static class CollectSink implements SinkFunctionString {// must be staticpublic static final ListString values Collections.synchronizedList(new ArrayList());Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}