Precisely measuring lag in a Spark streaming information pipeline to DeltaLake


How you can correctly use the Spark StreamingQueryListener to seize part lag with Prometheus

At ZipRecruiter, we match hundreds of thousands of job seekers with related job suggestions day-after-day, whereas offering data-rich recruitment providers to 1000’s of companies. Inherent to a seamless on-line consumer expertise is the velocity with which screens load and providers are rendered. 

When job seekers fill out their particulars like location, schooling, or employment historical past, they see IT of their profile virtually instantly. Behind the scenes, nonetheless, this information have to be made accessible for inside use in order that we will match candidates to Jobs utilizing highly effective machine studying fashions, provide intent pushed search instruments, and extra. 

As we proceed to scale, so does the stress on our information processing pipelines. To make sure a excessive customary of service, we constantly monitor the freshness of our information at each stage in our pipeline.

The next article particulars how we create exact, practical, and actionable metrics for monitoring our information processing lag. Reaching this level required altering our definition of lag a number of instances and debunking a number of misconceptions. 

The Spark StreamingQueryListener and Prometheus documentation are a bit coy about each their limitations and their strengths. Resist the temptation to construct your individual customized app to resolve this drawback, the prevailing instruments can do IT. Right here we go!

Defining Lag: The age of the oldest unprocessed report

On the onset, IT is essential to differentiate between ‘ahead trying lag’ and ‘backward trying lag’. 

  • Ahead trying lag refers to how lengthy IT will take us to course of all unprocessed information. This relies totally on how a lot compute energy is offered and is sort of versatile. 
  • Backward trying lag signifies how “stale”, outdated, or “contemporary” our downstream information is.

Right here we’re involved with backward trying lag, and subsequently outline lag as ‘the age of the oldest unprocessed report.’ The older the oldest unprocessed report, the bigger the lag. 

In batch processing pipelines, a certain quantity of lag is suitable since information accumulate earlier than a batch is processed and we additionally wish to enable the system to retry batches when momentary errors happen. The diploma of acceptable lag is derived from the SLO targets set. When there is no such thing as a lag, all enter information have been processed. When lag begins to develop over time, nonetheless, there may be probably a blockage within the pipeline. The query is: The place? 

Monitoring each pipeline part individually makes lag metrics actionable

On the highest degree, one might calculate ‘System Lag’. System lag encompasses the end-to-end lag of the whole system, from the second of report creation by a producer, the consumer on this case, to the second that report rests within the information lake after processing. That is available information as there may be normally a variety of metadata on pipeline efficiency, together with the report timestamp of when a job seeker clicked ‘Save’, and the timestamp of when information arrives to the app that’s placing IT into the information lake.

System lag is nice for a supervisor degree dashboard, however not for revealing or pinpointing an issue with a selected app or part within the pipeline (and figuring out the proper individual to take care of IT).

Subsequently, we have to dive deeper and set up ‘Element Lag’. Because the identify suggests, that is the lag associated to every particular part within the pipeline individually. In impact, the age of information that has not but entered into that part.

To gather the required metadata on pipeline efficiency on the granular part degree we have to use a couple of particular instruments.

Monitoring Spark question progress with checkpoints and variations

To keep up a level of fault-tolerance, Spark takes snapshots of its inside state, together with metadata about information progressing alongside checkpoints within the pipeline. If a failure happens through the processing of a streaming question, Spark can get well from the final profitable checkpoint, and proceed processing from that time, i.e. the final efficiently processed batch or report. This ensures that no information is misplaced and no duplicate processing happens.

To establish batches of information as IT crosses checkpoints in a Spark job utilizing Delta Lake, every batch’s information will get put into a special model of a Delta Desk.

Checkpoints happen on the “entry to” and “exit from” processing elements. Every checkpoint will log what variations have crossed IT, and that is how we will perceive what information has efficiently accomplished every stage within the pipeline. 

Take into account Determine 1. 4 units of information have been created, every receiving a model quantity  1..2..3..4. Model 1 has handed part 1 and checkpoint C has logged that state. Thus if the processing have been to fail now, we will resume figuring out that model 1 doesn’t have to undergo part 1 once more. Variations 2, 3 and 4, nonetheless, are nonetheless ready to be processed by part 1.

Determine 1. System lag vs. part lag in a Spark information processing pipeline, noting how checkpoints, variations and timestamps are associated.

The Spark Streaming Question Listener collects checkpoint Information on processed variations

The Spark StreamingQueryListener (SSQL) is the device used to observe and report on occasions within the processing pipeline. By monitoring at particular checkpoints within the pipeline, we will establish variations of information incoming and outgoing from particular elements.

Particularly for Delta Lake, the SSQL makes accessible sources.endOffset.reservoirVersion which we flip right into a spark_delta_last_read_end_version metric. Plotting that, together with the “final produced delta_version” (which we’ll present tips on how to receive afterward), for a selected part in a Delta Lake batch processing job that runs each hour, the graph would appear to be this: 

Determine 2. Model counters on producer and shopper of a selected part. 

The “final produced delta_version” counter (inexperienced) is constantly incrementing as new information is arriving from the producer (i.e., consumer or earlier part). As soon as each hour, the patron part processes all information (variations) collected in its backlog. The subsequent model arriving to the patron would be the new “final produced delta_version” and the start of lag. 

Within the instance from Determine 1, now we have a 3 model lag. What if IT have been a 50 model lag or 500? Ought to we alert somebody? 

Right here’s an instance the place one thing uncommon occurred. Don’t be postpone by the “final consumed delta_version” being greater than the “final produced delta_version”—IT’s completely regular to have Delta Lake transactions whose housekeeping is completed exterior the information producer.

Simply earlier than 19:00, a brand new model has been produced—however the shopper doesn’t react till after 22:30. On this scenario, the patron is barely behind by 1 model—however the model is large, and the precise clock time of our processor’s lag is over 3 hours.

Organising monitoring and alerts based mostly on collected unprocessed variations is deceptive as a result of a 50 model delay may imply a lot of different completely legitimate actions are being taken on the desk, akin to vacuuming. What’s extra, 50 or 500 variations may be processed in a short time, on the order of seconds as processing charges range tremendously.

Monitoring lag by way of variations will create an unrealistic snapshot, leading to overly delicate alerts and pointless pages for on-call builders. And you then’re on a slippery slope. One might dial down the alerts an excessive amount of in order that when one thing actually dangerous does set off an alert, the issue is already manner out of hand and can’t be solved quick sufficient.

And so, changing this Information into time items reflecting the age of the final unprocessed model is the way in which forward. 

Changing model lag into time with Prometheus

To transform model lag into actionable time items we use Prometheus, a time collection database.

Figuring out the oldest processed model

First, we should establish the oldest processed model. We use the Spark Streaming Question Listener-derived metric spark_delta_last_read_end_version. This metric is elevated on the finish of a part’s profitable run. 

max(max_over_time(spark_delta_last_read_end_version{job="example_processor"}[1d]))

spark_delta_last_read_end_version comes from our DeltaStreamListenerMetrics class, which converts the Delta Lake buildings that come by means of the Spark StreamingQueryListener into the last-processed desk model quantity when the patron has completed processing a batch. We use max_over_time() as a result of the metric is 0 till the processing is completed, and max() to filter out collection from earlier executions of the job inside the final day. The supply code is included within the subsequent part.

Discovering WHEN the oldest processed model was produced

Each time a model exists that’s greater than spark_delta_last_read_end_version, there could also be unprocessed information. Notice: Some variations don’t have information modifications and could possibly be excluded, however we might be conservative and ignore that for now. For the final unprocessed model, we should extract when IT was made accessible to our shopper part. In impact, the model creation time by the producer.

Prometheus is ready to extract Information concerning when a model was created, however can’t present the model ID. 

The problem, then, is to determine the time the oldest report was produced after the “final processed model”. We will do that through the use of ‘>’ to pick out all delta variations which might be newer than the final processed model, after which take the minimal timestamp of these delta variations.

min_over_time(
  (
      timestamp(delta_version{desk="example_input_dataset"})
    and
      (
          delta_version{desk="example_input_dataset"}
        >
          scalar(
            max(max_over_time(spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]))
          )
      )
  )[1d:]
)

Working from the within out, for the present PromQL analysis interval, we get the present model of the Delta desk and evaluate IT with the latest-processed model (spark_delta_last_read_end_version) over the entire subsequent day (trying ahead from the present analysis interval: offset -1d[1d:]). 

The  “>”  filters out any values the place the processor is all caught up, basically producing an expression that solutions “at every analysis interval, was there a better present delta_version that wanted processing?”. Utilizing the filter situation on the proper of timestamp(…) and will get timestamps of these analysis durations. Then the outermost min_over_time finds the earliest time {that a} yet-unprocessed model was accessible.

Crucially, you need to make completely positive to make use of the right desk and job labels. In any other case you run the chance of getting metrics that appear believable however are literally measuring one thing else, and even whole nonsense, if, for instance, the “example_processor” job doesn’t really learn the “example_input_dataset” desk. 

Calculating the lag

Lag is just the distinction between the present time and the worth computed above.

min(
    time()
  -
    min_over_time(
      (
          timestamp(delta_version{desk="example_input_dataset"})
        and
          (
              (delta_version{desk="example_input_dataset"})
            >
              scalar(
                max(max_over_time(spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]))
              )
          )
      )[1d:]
    )
)

Making a lag time collection

To create a time collection dataset that reliably captures the lag for historic variations, we arrange a Prometheus recording rule. The recording rule is used to provide a brand new time collection from a sophisticated expression worth at every analysis interval. This fashion, historic values are persistently accessible. In any other case, Prometheus has a restricted skill to look ahead from earlier analysis durations, which suggests in observe, with out the recording rule we’re unable to see the lag for another variations earlier than the last-processed one.

teams:                                                                                                                         
- identify: guidelines                                                                                                                   
  guidelines:                                                                                                                        
  - report: Example_delta_lag_time
    expr: |                                                                                                                     
      min(
          time()
        -
          min_over_time(
            (
                timestamp(delta_version{desk="example_input_dataset"})
              and
                (
                    (delta_version{desk="example_input_dataset"})
                  >
                    scalar(
                      max(
                        max_over_time(
      spark_delta_last_read_end_version{job="example_processor"} offset -1d[1d:]
                        )
                      )
                    )
                )
            )[1d:]
          )
      )

Organising an alert

Now we will use the recording in an alert expression.

  - alert: Example_Lag
    expr: Example_delta_lag_time > 10800
    for: 1h

A visualization of age of oldest unprocessed enter to a part will appear to be this: 

The age of the oldest unprocessed report will increase over time. When the batch course of runs, all variations are consumed and the following batch begins accumulating once more. 

Supply code

Instance producer:

import com.ziprecruiter.log.scala.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}

import java.util.UUID
import scala.util.Random

object ExampleProducer {

  val logger: Logger = Logger[this.type]
  val spark = SparkSession
    .builder()
    .grasp("native")
    .config(
      new SparkConf()
        .setAppName("ExampleProducer")
        .set(
          "spark.sql.catalog.spark_catalog",
          "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
    )
    .getOrCreate()

  def randomUUID(): String = UUID.randomUUID().toString

  non-public def generateBatch(): Dataset[InputRecord] = {
    import spark.implicits._
    Seq.fill(Random.nextInt(10))(InputRecord(randomUUID())).toDS()
  }

  def fundamental(args: Array[String]): Unit = {
    val newData = generateBatch()
    newData.write.format("delta").mode("append").save("/tmp/dataset")
    val producedVersion =
      spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
    logger.information(
      "would push this metric",
      """delta_version{desk="example_dataset"}""" -> producedVersion
    )
  }
}

case class InputRecord(id: String)

Instance shopper:

import com.ziprecruiter.frequent.spark.metrics.SparkListenerMetrics
import com.ziprecruiter.log.scala.Logger
import com.ziprecruiter.metrics.prometheus.{
  PrometheusMetricConfig,
  PrometheusMetricPushGatewayConfig
}
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.Set off
import org.apache.spark.sql.{Dataset, SparkSession}
import com.ziprecruiter.metrics.MetricRegistry

object ExampleConsumer {

  val logger: Logger = Logger[this.type]
  val spark = SparkSession
    .builder()
    .grasp("native")
    .config(
      new SparkConf()
        .setAppName("ExampleConsumer")
        .set(
          "spark.sql.catalog.spark_catalog",
          "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
    )
    .getOrCreate()

  non-public def processInput(): Dataset[OutputRecord] = {
    import spark.implicits._
    spark.readStream
      .format("delta")
      .load("/tmp/dataset")
      .as[InputRecord]
      .map { x =>
        OutputRecord(id = x.id, standing = "IS AWESOME")
      }
  }

  def fundamental(args: Array[String]): Unit = {
    MetricRegistry.getRegistry(
      new PrometheusMetricConfig()
        .setIncludeJvmMetrics(true)
        .setPushGateway(new PrometheusMetricPushGatewayConfig())
    )
    spark.sparkContext.addSparkListener(new SparkListenerMetrics)
    val deltaMetrics = new DeltaStreamListenerMetrics()
    spark.streams.addListener(deltaMetrics)

    processInput().writeStream
      .possibility("checkpointLocation", "/tmp/example-consumer-checkpoint")
      .foreachBatch { (batch: Dataset[OutputRecord], batchID: Lengthy) =>
        logger.information("writing batch", "record_count" -> batch.depend())
        batch.write.format("delta").mode("append").save("/tmp/shopper")
      }
      .set off(Set off.As soon as)
      .begin()
      .awaitTermination()

    logger.information(
      "would push this metric",
      """delta_last_read_end_version""" -> deltaMetrics.deltaLastReadEndVersion
        .get()
    )
  }
}

case class OutputRecord(id: String, standing: String)

DeltaStreamListenerMetrics (For the patron):

import com.ziprecruiter.frequent.spark.metrics._
import com.ziprecruiter.metrics.MetricRegistry
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.json4s._
import org.json4s.jackson.JsonMethods._
import com.ziprecruiter.log.scala.Logger
import org.apache.spark.sql.delta.sources.DeltaSourceOffset

class DeltaStreamListenerMetrics extends StreamingQueryListener {

  non-public val logger: Logger = Logger[this.type]
  non-public val registry = MetricRegistry.getRegistry()

  val deltaLastReadStartVersion = registry.createGauge(
    registry
      .createNameBuilder()
      .setName("spark_delta_last_read_start_version")
      .setHelp(
        "Final Delta Model that was learn from a supply throughout a stream course of as the beginning offset"
      )
      .construct()
  )

  val deltaLastReadEndVersion = registry.createGauge(
    registry
      .createNameBuilder()
      .setName("spark_delta_last_read_end_version")
      .setHelp(
        "Final Delta Model that was learn from a supply throughout a stream course of as an finish offset"
      )
      .construct()
  )

  override def onQueryProgress(
      occasion: StreamingQueryListener.QueryProgressEvent
  ): Unit = {
    if (occasion.progress.sources.dimension > 0) {
      val startOffsetJson: String = occasion.progress.sources(0).startOffset
      val endOffsetJson: String = occasion.progress.sources(0).endOffset

      val startVersion = offsetInfoToDeltaVersion(startOffsetJson)
      val endVersion = offsetInfoToDeltaVersion(endOffsetJson)

      startVersion match {
        case Some(i) => {
          deltaLastReadStartVersion.set(i)
          logger.information(
            "onQueryProgress",
            "readStartDeltaVersion" -> startVersion
          )
        }
        case None =>
          logger.error("onQueryProgress did not parse delta variations")
      }

      endVersion match {
        case Some(i) => {
          deltaLastReadEndVersion.set(i)
          logger.information("readEndDeltaVersion" -> startVersion)
        }
        case None =>
          logger.error("onQueryProgress ailed to parse delta variations")
      }
    }
  }

  def offsetInfoToDeltaVersion(offsetInfo: String): Choice[Long] = {

    implicit val codecs = DefaultFormats
    if (offsetInfo == null || offsetInfo == "") {
      logger.warn(
        "offsetInfoToDeltaVersion: offsetInfo is empty. Can't retrieve learn model"
      )
      return None
    }
    strive {
      val offsetJson: JValue = parse(offsetInfo)
      val startOffset = offsetJson.extractOrElse[DeltaSourceOffset](null)
      if (startOffset == null) {
        logger.warn("offsetInfoToDeltaVersion couldn't extract model information")
        None
      } else {
        Some(startOffset.reservoirVersion)
      }
    } catch {
      case e: Exception =>
        logger.error("offsetInfoToDeltaVersion failed", "error" -> e)
        e.printStackTrace()
        None
    }
  }

  override def onQueryStarted(
      occasion: StreamingQueryListener.QueryStartedEvent
  ): Unit = {
    logger.information(
      "onQueryStarted",
      "occasion" -> s"${occasion.id}, ${occasion.runId}, ${occasion.identify}, ${occasion.timestamp}"
    )
  }

  override def onQueryTerminated(
      occasion: StreamingQueryListener.QueryTerminatedEvent
  ): Unit = {
    logger.information(
      "onQueryTerminated",
      "occasion" -> s"${occasion.id}, ${occasion.runId}, ${occasion.exception.getOrElse("")}"
    )
  }
}

Organising alerts with practical lag targets

When rolling out any new system, IT is prudent to set extremely delicate alert thresholds and slowly scale back them because the system stabilizes.

After all, lag must be as little as attainable, however we additionally wish to keep away from creating pointless stress to realize overly zealous latencies. The system have to be allowed to repair itself (retry on failure), tolerate some outages, and never “wake folks up” except IT’s really crucial. 

In the long run, we wish to make sure that the lag (age of the oldest unprocessed model) for every part (and all elements cumulatively) is nicely below the SLO time for that service.

With the intention to obtain the goal SLO, we will make a job run extra steadily. It will change the model fee, however as we’re monitoring in time, our monitoring just isn’t skewed.

As a closing observe, bear in mind – when a part within the pipeline has a conspicuously constant zero lag, make sure you return upstream and ensure there may be not a blockage in earlier elements.

In case you’re fascinated about engaged on options like these, go to our Careers web page to see open roles.

* * *

In regards to the Creator:

Jeff Rhyason is a Senior Software program Engineer at ZipRecruiter on the Jobseeker Expertise workforce the place he modernizes our datastores and APIs so groups can work extra independently. Jeff’s workforce is absolutely distant, distributed throughout Canada and the US. They’re a decent knit group and he loves wrangling curly braces with such a gifted and numerous bunch. On off hours you’ll discover Jeff biking round Vancouver, Canada, fueled on a gentle eating regimen of kombucha.



Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top