[FLINK-33100] Implement YarnJobListFetcher#1031
[FLINK-33100] Implement YarnJobListFetcher#1031Samrat002 wants to merge 1 commit intoapache:mainfrom
Conversation
6ba949e to
87eaa35
Compare
1996fanrui
left a comment
There was a problem hiding this comment.
Hey @Samrat002 , thanks for picking it up! and sorry for the late review.
I have some questions about this PR, please help take a look when you are available, thanks
| --autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN | ||
| ``` | ||
|
|
||
| When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web proxy endpoint that exposes the JobManager REST API. |
There was a problem hiding this comment.
set the host/port to the YARN web proxy endpoint
Do you mean autoscaler.standalone.fetcher.flink-cluster.host and autoscaler.standalone.fetcher.flink-cluster.port?
If yes, it does not make sense. Because all config options with autoscaler.standalone.fetcher.flink-cluster prefix are related to flink-cluster. It is better to introduce yarn cluster related config options.
| To select the job fetcher use: | ||
|
|
||
| ``` | ||
| --autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN |
There was a problem hiding this comment.
How about introducing a whole demo for yarn mode?
| We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call | ||
| `YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically. | ||
| Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are implementations of the | ||
| `JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far. |
There was a problem hiding this comment.
that's why
Flink Autoscaler Standaloneonly supports a single Flink cluster so far.
It no longer makes sense after adding YARN support, and the sentence should either be removed or rewritten to explain that each fetcher instance still monitors a single cluster or YARN deployment.
| default: | ||
| return (JobListFetcher<KEY, Context>) | ||
| new FlinkClusterJobListFetcher( | ||
| clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT)); |
There was a problem hiding this comment.
The default value of AutoscalerStandaloneOptions.FETCHER_TYPE is FLINK_CLUSTER, so including default case that falls back to FLINK_CLUSTER here does not make sense, because it silently accepts invalid configuration values. Throwing an exception for unknown fetcher types is better. It could prevent potential bugs if introducing new type in the future.
| public static final ConfigOption<FetcherType> FETCHER_TYPE = | ||
| autoscalerStandaloneConfig("fetcher.type") | ||
| .enumType(FetcherType.class) | ||
| .defaultValue(FetcherType.FLINK_CLUSTER) | ||
| .withDescription( | ||
| "The job list fetcher type to use. Supported values: FLINK_CLUSTER, YARN."); |
There was a problem hiding this comment.
https://github.com/apache/flink-kubernetes-operator/blob/main/docs/README.md
Please generate docs according to this doc. Also, IIRC, it is not needed to mentioned values, and doc tools will list all values by default.
| } catch (Throwable ignore) { | ||
| // Ignore |
There was a problem hiding this comment.
It suppresses all exceptions including critical ones like OutOfMemoryError without any logging, making it impossible to diagnose why YARN-based job discovery failed, such as: do not know if there are configuration issues, network problems, or authentication failures.
| return discovered; | ||
| } | ||
|
|
||
| // use supplied client factory (may point to direct JM or a reverse proxy) |
There was a problem hiding this comment.
why fallback to JM or flink cluster here? If this is what the user expects, why choosing yarn cluster fetcher instead of flink cluster fetcher?
| yarnClient = YarnClient.createYarnClient(); | ||
| org.apache.hadoop.conf.Configuration yarnConf = | ||
| new org.apache.hadoop.conf.Configuration(); | ||
| yarnClient.init(yarnConf); | ||
| yarnClient.start(); |
There was a problem hiding this comment.
Creating YarnClient without any Hadoop configuration, I am not sure whether it works. Generally, it needs Hadoop configuration files like core-site.xml or yarn-site.xml that might be present in the classpath.
| } | ||
| break; | ||
| } | ||
| } catch (Throwable ignore) { |
There was a problem hiding this comment.
This catch does not provide fault isolation among jobs or yarn applications, if one job is stuck on GC or something else, the autoscaler won't work for all applciations.
| <dependency> | ||
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-yarn</artifactId> | ||
| <version>${flink.version}</version> | ||
| </dependency> |
There was a problem hiding this comment.
Is it possible to minimize the scope of dependencies? For example, only yarn-client is added here.
Also, is it needed to exclude some dependencies to avoid dependency conflicts?
|
@Samrat002 are you still working on this? |
What is the purpose of the change
YarnJobListFetcher Implementation
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: (yes / no)Documentation