Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sdf kafka poll latencies #34275

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ public MetricUpdates getUpdates() {

// Add any metricKey labels to the monitoringInfoLabels.
if (!metricName.getLabels().isEmpty()) {
for (Map.Entry<String, String> entry : metricName.getLabels().entrySet()) {
builder.setLabel(entry.getKey(), entry.getValue());
}
builder.setLabels(metricName.getLabels());
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ public static double decodeDoubleCounter(ByteString payload) {
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
return inputHistogram.toProto().toByteString();
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
try {
return new HistogramData(HistogramValue.parseFrom(payload));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,122 +454,125 @@ public ProcessContinuation processElement(
final Stopwatch sw = Stopwatch.createStarted();

while (true) {
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
// When there are no records available for the current TopicPartition, self-checkpoint
// and move to process the next element.
if (rawRecords.isEmpty()) {
if (!topicPartitionExists(
kafkaSourceDescriptor.getTopicPartition(),
consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
return ProcessContinuation.stop();
}
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
}
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
// If the Kafka consumer returns a record with an offset that is already processed
// the record can be safely skipped. This is needed because there is a possibility
// that the seek() above fails to move the offset to the desired position. In which
// case poll() would return records that are already cnsumed.
if (rawRecord.offset() < startOffset) {
// If the start offset is not reached even after skipping the records for 10 seconds
// then the processing is stopped with a backoff to give the Kakfa server some time
// catch up.
if (sw.elapsed().getSeconds() > 10L) {
LOG.error(
"The expected offset ({}) was not reached even after"
+ " skipping consumed records for 10 seconds. The offset we could"
+ " reach was {}. The processing of this bundle will be attempted"
+ " at a later time.",
expectedOffset,
rawRecord.offset());
return ProcessContinuation.resume()
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
}
skippedRecords++;
continue;
}
if (skippedRecords > 0L) {
LOG.warn(
"{} records were skipped due to seek returning an"
+ " earlier position than requested position of {}",
skippedRecords,
expectedOffset);
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
return ProcessContinuation.stop();
}
try {
KafkaRecord<K, V> kafkaRecord =
new KafkaRecord<>(
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
ConsumerSpEL.getRecordTimestamp(rawRecord),
ConsumerSpEL.getRecordTimestampType(rawRecord),
ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord),
ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord));
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Instant outputTimestamp;
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
// WatermarkEstimator should be a manual one.
if (timestampPolicy != null) {
TimestampPolicyContext context =
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
} else {
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics);
// When there are no records available for the current TopicPartition, self-checkpoint
// and move to process the next element.
if (rawRecords.isEmpty()) {
if (!topicPartitionExists(
kafkaSourceDescriptor.getTopicPartition(),
consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
return ProcessContinuation.stop();
}
receiver
.get(recordTag)
.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
} catch (SerializationException e) {
// This exception should only occur during the key and value deserialization when
// creating the Kafka Record
badRecordRouter.route(
receiver,
rawRecord,
null,
e,
"Failure deserializing Key or Value of Kakfa record reading from Kafka");
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
}
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
// If the Kafka consumer returns a record with an offset that is already processed
// the record can be safely skipped. This is needed because there is a possibility
// that the seek() above fails to move the offset to the desired position. In which
// case poll() would return records that are already cnsumed.
if (rawRecord.offset() < startOffset) {
// If the start offset is not reached even after skipping the records for 10 seconds
// then the processing is stopped with a backoff to give the Kakfa server some time
// catch up.
if (sw.elapsed().getSeconds() > 10L) {
LOG.error(
"The expected offset ({}) was not reached even after"
+ " skipping consumed records for 10 seconds. The offset we could"
+ " reach was {}. The processing of this bundle will be attempted"
+ " at a later time.",
expectedOffset,
rawRecord.offset());
return ProcessContinuation.resume()
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
}
skippedRecords++;
continue;
}
if (skippedRecords > 0L) {
LOG.warn(
"{} records were skipped due to seek returning an"
+ " earlier position than requested position of {}",
skippedRecords,
expectedOffset);
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
return ProcessContinuation.stop();
}
try {
KafkaRecord<K, V> kafkaRecord =
new KafkaRecord<>(
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
ConsumerSpEL.getRecordTimestamp(rawRecord),
ConsumerSpEL.getRecordTimestampType(rawRecord),
ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord),
ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord));
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Instant outputTimestamp;
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
// WatermarkEstimator should be a manual one.
if (timestampPolicy != null) {
TimestampPolicyContext context =
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
} else {
Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
}
receiver
.get(recordTag)
.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), outputTimestamp);
} catch (SerializationException e) {
// This exception should only occur during the key and value deserialization when
// creating the Kafka Record
badRecordRouter.route(
receiver,
rawRecord,
null,
e,
"Failure deserializing Key or Value of Kakfa record reading from Kafka");
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
}
}
}
}

backlogBytes.set(
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
kafkaResults.updateBacklogBytes(
kafkaSourceDescriptor.getTopic(),
kafkaSourceDescriptor.getPartition(),
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
kafkaResults.flushBufferedMetrics();
backlogBytes.set(
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
kafkaMetrics.updateBacklogBytes(
kafkaSourceDescriptor.getTopic(),
kafkaSourceDescriptor.getPartition(),
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
} finally {
kafkaMetrics.flushBufferedMetrics();
}
}
}
}
Expand All @@ -583,13 +586,15 @@ private boolean topicPartitionExists(

// see https://github.com/apache/beam/issues/25962
private ConsumerRecords<byte[], byte[]> poll(
Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, KafkaMetrics kafkaMetrics) {
final Stopwatch sw = Stopwatch.createStarted();
long previousPosition = -1;
java.time.Duration elapsed = java.time.Duration.ZERO;
java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout);
while (true) {
java.time.Duration elapsed = sw.elapsed();
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(timeout.minus(elapsed));
kafkaMetrics.updateSuccessfulRpcMetrics(
topicPartition.topic(), java.time.Duration.ofMillis(elapsed.toMillis()));
if (!rawRecords.isEmpty()) {
// return as we have found some entries
return rawRecords;
Expand All @@ -598,7 +603,6 @@ private ConsumerRecords<byte[], byte[]> poll(
// there was no progress on the offset/position, which indicates end of stream
return rawRecords;
}
elapsed = sw.elapsed();
if (elapsed.toMillis() >= timeout.toMillis()) {
// timeout is over
LOG.warn(
Expand Down
Loading