aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.ci/metrics/metrics.py108
1 files changed, 94 insertions, 14 deletions
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 8edc00b..48d2aa2 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -26,7 +26,67 @@ class JobMetrics:
workflow_id: int
-def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
+@dataclass
+class GaugeMetric:
+ name: str
+ value: int
+ time_ns: int
+
+
+def get_sampled_workflow_metrics(github_repo: github.Repository):
+ """Gets global statistics about the Github workflow queue
+
+ Args:
+ github_repo: A github repo object to use to query the relevant information.
+
+ Returns:
+ Returns a list of GaugeMetric objects, containing the relevant metrics about
+ the workflow
+ """
+
+ # Other states are available (pending, waiting, etc), but the meaning
+ # is not documented (See #70540).
+ # "queued" seems to be the info we want.
+ queued_workflow_count = len(
+ [
+ x
+ for x in github_repo.get_workflow_runs(status="queued")
+ if x.name in WORKFLOWS_TO_TRACK
+ ]
+ )
+ running_workflow_count = len(
+ [
+ x
+ for x in github_repo.get_workflow_runs(status="in_progress")
+ if x.name in WORKFLOWS_TO_TRACK
+ ]
+ )
+
+ workflow_metrics = []
+ workflow_metrics.append(
+ GaugeMetric(
+ "workflow_queue_size",
+ queued_workflow_count,
+ time.time_ns(),
+ )
+ )
+ workflow_metrics.append(
+ GaugeMetric(
+ "running_workflow_count",
+ running_workflow_count,
+ time.time_ns(),
+ )
+ )
+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+ workflow_metrics.append(
+ GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+ )
+ return workflow_metrics
+
+
+def get_per_workflow_metrics(
+ github_repo: github.Repository, workflows_to_track: dict[str, int]
+):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
- workflow_runs = iter(github_repo.get_workflow_runs())
-
workflow_metrics = []
workflows_to_include = set(workflows_to_track.keys())
- while len(workflows_to_include) > 0:
- workflow_run = next(workflow_runs)
+ for workflow_run in iter(github_repo.get_workflow_runs()):
+ if len(workflows_to_include) == 0:
+ break
+
if workflow_run.status != "completed":
continue
@@ -139,12 +199,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""
+
+ if len(workflow_metrics) == 0:
+ print("No metrics found to upload.", file=sys.stderr)
+ return
+
metrics_batch = []
for workflow_metric in workflow_metrics:
- workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
- metrics_batch.append(
- f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
- )
+ if isinstance(workflow_metric, GaugeMetric):
+ name = workflow_metric.name.lower().replace(" ", "_")
+ metrics_batch.append(
+ f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
+ )
+ elif isinstance(workflow_metric, JobMetrics):
+ name = workflow_metric.job_name.lower().replace(" ", "_")
+ metrics_batch.append(
+ f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
+ )
+ else:
+ raise ValueError(
+ f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
+ )
request_data = "\n".join(metrics_batch)
response = requests.post(
@@ -176,16 +251,21 @@ def main():
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
- current_metrics = get_metrics(github_repo, workflows_to_track)
- if len(current_metrics) == 0:
- print("No metrics found to upload.", file=sys.stderr)
- continue
+ current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
+ current_metrics += get_sampled_workflow_metrics(github_repo)
+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+ current_metrics.append(
+ GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+ )
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
for workflow_metric in reversed(current_metrics):
- workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
+ if isinstance(workflow_metric, JobMetrics):
+ workflows_to_track[
+ workflow_metric.job_name
+ ] = workflow_metric.workflow_id
time.sleep(SCRAPE_INTERVAL_SECONDS)