Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,13 @@ Map<String, FlinkMetric> getFilteredVertexMetricNames(
.findAny(allMetricNames)
.ifPresent(
m -> filteredMetrics.put(m, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
} else {
FlinkMetric.NUM_RECORDS_IN_PER_SEC
.findAny(allMetricNames)
.ifPresent(m -> filteredMetrics.put(m, FlinkMetric.NUM_RECORDS_IN_PER_SEC));
FlinkMetric.NUM_RECORDS_OUT_PER_SEC
.findAny(allMetricNames)
.ifPresent(m -> filteredMetrics.put(m, FlinkMetric.NUM_RECORDS_OUT_PER_SEC));
}

for (FlinkMetric flinkMetric : requiredMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ protected static boolean isProcessingBacklog(
vertex -> {
double lag = lastMetrics.get(vertex).getOrDefault(LAG, 0.0);
double inputRateAvg =
getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
ScalingMetric.NUM_RECORDS_IN,
vertex,
metricsHistory);
if (Double.isNaN(inputRateAvg)) {
return false;
}
Expand Down Expand Up @@ -139,7 +143,12 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(

var vertexInfo = topology.get(vertex);

double inputRateAvg = getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory);
double inputRateAvg =
getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
ScalingMetric.NUM_RECORDS_IN,
vertex,
metricsHistory);

var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
computeTargetDataRate(
Expand Down Expand Up @@ -331,7 +340,7 @@ private void computeTargetDataRate(
if (topology.isSource(vertex)) {
double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();

double lagRate = getRate(LAG, vertex, metricsHistory);
double lagRate = getAverageRate(LAG, vertex, metricsHistory);
double ingestionDataRate = Math.max(0, inputRate + lagRate);
if (Double.isNaN(ingestionDataRate)) {
throw new RuntimeException(
Expand Down Expand Up @@ -494,6 +503,49 @@ public static double getAverage(
return n < minElements ? Double.NaN : sum / n;
}

public static double getAverageRate(
ScalingMetric metric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {

double sumRates = 0;
int n = 0;
Instant prevTs = null;
double prevValue = Double.NaN;

for (var entry : metricsHistory.entrySet()) {
double value =
entry.getValue()
.getVertexMetrics()
.get(jobVertexId)
.getOrDefault(metric, Double.NaN);
if (Double.isNaN(value)) {
continue;
}
if (!Double.isNaN(prevValue)) {
long tsDiff = Duration.between(prevTs, entry.getKey()).toMillis();
if (tsDiff > 0) {
sumRates += 1000.0 * (value - prevValue) / tsDiff;
n++;
}
}
prevValue = value;
prevTs = entry.getKey();
}
return n == 0 ? Double.NaN : sumRates / n;
}

public static double getAverageWithRateFallback(
ScalingMetric metric,
ScalingMetric fallbackMetric,
@Nullable JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
double average = getAverage(metric, jobVertexId, metricsHistory);
return Double.isInfinite(average) || Double.isNaN(average)
? getRate(fallbackMetric, jobVertexId, metricsHistory)
: average;
}

/**
* Compute the In/Out ratio between the (from, to) vertices. The rate estimates the number of
* output records produced to the downstream vertex for every input received for the upstream
Expand All @@ -513,7 +565,12 @@ protected static double computeEdgeOutputRatio(
JobTopology topology,
SortedMap<Instant, CollectedMetrics> metricsHistory) {

double inputRate = getRate(ScalingMetric.NUM_RECORDS_IN, from, metricsHistory);
double inputRate =
getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
ScalingMetric.NUM_RECORDS_IN,
from,
metricsHistory);

double outputRatio = 0;
// If the input rate is zero, we also need to flatten the output rate.
Expand Down Expand Up @@ -552,7 +609,11 @@ protected static double computeEdgeDataRate(
if (toVertexInputs.size() == 1) {
LOG.debug(
"Computing edge ({}, {}) data rate for single input downstream task", from, to);
return getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory);
return getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
ScalingMetric.NUM_RECORDS_IN,
to,
metricsHistory);
}

// Case 2: Downstream vertex has only inputs from upstream vertices which don't have other
Expand All @@ -565,7 +626,11 @@ protected static double computeEdgeDataRate(
}
if (topology.get(input).getOutputs().size() == 1) {
inputRateFromOtherVertices +=
getRate(ScalingMetric.NUM_RECORDS_OUT, input, metricsHistory);
getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_OUT_PER_SECOND,
ScalingMetric.NUM_RECORDS_OUT,
input,
metricsHistory);
} else {
// Output vertex has multiple outputs, cannot use this information...
inputRateFromOtherVertices = Double.NaN;
Expand All @@ -577,15 +642,24 @@ protected static double computeEdgeDataRate(
"Computing edge ({}, {}) data rate by subtracting upstream input rates",
from,
to);
return getRate(ScalingMetric.NUM_RECORDS_IN, to, metricsHistory)
- inputRateFromOtherVertices;
double inputRateAvg =
getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
ScalingMetric.NUM_RECORDS_IN,
to,
metricsHistory);
return inputRateAvg - inputRateFromOtherVertices;
}

// Case 3: We fall back simply to num records out, this is the least reliable
LOG.debug(
"Computing edge ({}, {}) data rate by falling back to from num records out",
from,
to);
return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory);
return getAverageWithRateFallback(
ScalingMetric.NUM_RECORDS_OUT_PER_SECOND,
ScalingMetric.NUM_RECORDS_OUT,
from,
metricsHistory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public enum FlinkMetric {
SOURCE_TASK_NUM_RECORDS_IN(s -> s.startsWith("Source__") && s.endsWith(".numRecordsIn")),
PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
NUM_RECORDS_OUT_PER_SEC(s -> s.equals("numRecordsOutPerSecond")),

HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ public enum ScalingMetric {

NUM_RECORDS_IN(false),

NUM_RECORDS_IN_PER_SECOND(false),

NUM_RECORDS_OUT(false),

NUM_RECORDS_OUT_PER_SECOND(false),

ACCUMULATED_BUSY_TIME(false),

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public static void computeDataRateMetrics(
.orElseGet(observedTprAvg);
scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
}
} else {
var inPerSec = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
if (inPerSec != null) {
scalingMetrics.put(ScalingMetric.NUM_RECORDS_IN_PER_SECOND, inPerSec.getSum());
}
var outPerSec = flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
if (outPerSec != null) {
scalingMetrics.put(ScalingMetric.NUM_RECORDS_OUT_PER_SECOND, outPerSec.getSum());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@

import javax.annotation.Nullable;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -113,9 +115,12 @@ void testMetricReporting() throws Exception {
scalingRealizer,
stateStore);

var now = Instant.now();
autoscaler.setClock(Clock.fixed(now, ZoneId.systemDefault()));
autoscaler.scale(context);

metricsCollector.updateMetrics(jobVertexID, m -> m.setNumRecordsIn(100));
autoscaler.setClock(Clock.fixed(now.plus(Duration.ofSeconds(10)), ZoneId.systemDefault()));
autoscaler.scale(context);

MetricGroup metricGroup = autoscaler.flinkMetrics.get(context.getJobKey()).getMetricGroup();
Expand Down