{"version":3,"file":"aggregator.js","sources":["../../../src/metrics/aggregator.ts"],"sourcesContent":["import type { Client, MeasurementUnit, MetricsAggregator as MetricsAggregatorBase, Primitive } from '@sentry/types';\nimport { timestampInSeconds } from '@sentry/utils';\nimport { updateMetricSummaryOnActiveSpan } from '../utils/spanUtils';\nimport { DEFAULT_FLUSH_INTERVAL, MAX_WEIGHT, SET_METRIC_TYPE } from './constants';\nimport { captureAggregateMetrics } from './envelope';\nimport { METRIC_MAP } from './instance';\nimport type { MetricBucket, MetricType } from './types';\nimport { getBucketKey, sanitizeMetricKey, sanitizeTags, sanitizeUnit } from './utils';\n\n/**\n * A metrics aggregator that aggregates metrics in memory and flushes them periodically.\n */\nexport class MetricsAggregator implements MetricsAggregatorBase {\n // TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets\n // when the aggregator is garbage collected.\n // Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry\n private _buckets: MetricBucket;\n\n // Different metrics have different weights. We use this to limit the number of metrics\n // that we store in memory.\n private _bucketsTotalWeight;\n\n // Cast to any so that it can use Node.js timeout\n // eslint-disable-next-line @typescript-eslint/no-explicit-any\n private readonly _interval: any;\n\n // SDKs are required to shift the flush interval by random() * rollup_in_seconds.\n // That shift is determined once per startup to create jittering.\n private readonly _flushShift: number;\n\n // An SDK is required to perform force flushing ahead of scheduled time if the memory\n // pressure is too high. There is no rule for this other than that SDKs should be tracking\n // abstract aggregation complexity (eg: a counter only carries a single float, whereas a\n // distribution is a float per emission).\n //\n // Force flush is used on either shutdown, flush() or when we exceed the max weight.\n private _forceFlush: boolean;\n\n public constructor(private readonly _client: Client) {\n this._buckets = new Map();\n this._bucketsTotalWeight = 0;\n\n // eslint-disable-next-line @typescript-eslint/no-explicit-any\n this._interval = setInterval(() => this._flush(), DEFAULT_FLUSH_INTERVAL) as any;\n // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access\n if (this._interval.unref) {\n // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access\n this._interval.unref();\n }\n\n this._flushShift = Math.floor((Math.random() * DEFAULT_FLUSH_INTERVAL) / 1000);\n this._forceFlush = false;\n }\n\n /**\n * @inheritDoc\n */\n public add(\n metricType: MetricType,\n unsanitizedName: string,\n value: number | string,\n unsanitizedUnit: MeasurementUnit = 'none',\n unsanitizedTags: Record = {},\n maybeFloatTimestamp = timestampInSeconds(),\n ): void {\n const timestamp = Math.floor(maybeFloatTimestamp);\n const name = sanitizeMetricKey(unsanitizedName);\n const tags = sanitizeTags(unsanitizedTags);\n const unit = sanitizeUnit(unsanitizedUnit as string);\n\n const bucketKey = getBucketKey(metricType, name, unit, tags);\n\n let bucketItem = this._buckets.get(bucketKey);\n // If this is a set metric, we need to calculate the delta from the previous weight.\n const previousWeight = bucketItem && metricType === SET_METRIC_TYPE ? bucketItem.metric.weight : 0;\n\n if (bucketItem) {\n bucketItem.metric.add(value);\n // TODO(abhi): Do we need this check?\n if (bucketItem.timestamp < timestamp) {\n bucketItem.timestamp = timestamp;\n }\n } else {\n bucketItem = {\n // @ts-expect-error we don't need to narrow down the type of value here, saves bundle size.\n metric: new METRIC_MAP[metricType](value),\n timestamp,\n metricType,\n name,\n unit,\n tags,\n };\n this._buckets.set(bucketKey, bucketItem);\n }\n\n // If value is a string, it's a set metric so calculate the delta from the previous weight.\n const val = typeof value === 'string' ? bucketItem.metric.weight - previousWeight : value;\n updateMetricSummaryOnActiveSpan(metricType, name, val, unit, unsanitizedTags, bucketKey);\n\n // We need to keep track of the total weight of the buckets so that we can\n // flush them when we exceed the max weight.\n this._bucketsTotalWeight += bucketItem.metric.weight;\n\n if (this._bucketsTotalWeight >= MAX_WEIGHT) {\n this.flush();\n }\n }\n\n /**\n * Flushes the current metrics to the transport via the transport.\n */\n public flush(): void {\n this._forceFlush = true;\n this._flush();\n }\n\n /**\n * Shuts down metrics aggregator and clears all metrics.\n */\n public close(): void {\n this._forceFlush = true;\n clearInterval(this._interval);\n this._flush();\n }\n\n /**\n * Flushes the buckets according to the internal state of the aggregator.\n * If it is a force flush, which happens on shutdown, it will flush all buckets.\n * Otherwise, it will only flush buckets that are older than the flush interval,\n * and according to the flush shift.\n *\n * This function mutates `_forceFlush` and `_bucketsTotalWeight` properties.\n */\n private _flush(): void {\n // TODO(@anonrig): Add Atomics for locking to avoid having force flush and regular flush\n // running at the same time.\n // Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics\n\n // This path eliminates the need for checking for timestamps since we're forcing a flush.\n // Remember to reset the flag, or it will always flush all metrics.\n if (this._forceFlush) {\n this._forceFlush = false;\n this._bucketsTotalWeight = 0;\n this._captureMetrics(this._buckets);\n this._buckets.clear();\n return;\n }\n const cutoffSeconds = Math.floor(timestampInSeconds()) - DEFAULT_FLUSH_INTERVAL / 1000 - this._flushShift;\n // TODO(@anonrig): Optimization opportunity.\n // Convert this map to an array and store key in the bucketItem.\n const flushedBuckets: MetricBucket = new Map();\n for (const [key, bucket] of this._buckets) {\n if (bucket.timestamp <= cutoffSeconds) {\n flushedBuckets.set(key, bucket);\n this._bucketsTotalWeight -= bucket.metric.weight;\n }\n }\n\n for (const [key] of flushedBuckets) {\n this._buckets.delete(key);\n }\n\n this._captureMetrics(flushedBuckets);\n }\n\n /**\n * Only captures a subset of the buckets passed to this function.\n * @param flushedBuckets\n */\n private _captureMetrics(flushedBuckets: MetricBucket): void {\n if (flushedBuckets.size > 0) {\n // TODO(@anonrig): Optimization opportunity.\n // This copy operation can be avoided if we store the key in the bucketItem.\n const buckets = Array.from(flushedBuckets).map(([, bucketItem]) => bucketItem);\n captureAggregateMetrics(this._client, buckets);\n }\n }\n}\n"],"names":[],"mappings":";;;;;;;AASA;AACA;AACA;AACO,MAAM,mBAAmD;AAChE;AACA;AACA;;AAGA;AACA;;AAGA;AACA;;AAGA;AACA;;AAGA;AACA;AACA;AACA;AACA;AACA;;AAGA,GAAS,WAAW,GAAkB,OAAO,EAAU,CAAA,IAAA,CAAA,OAAA,GAAA,OAAA;AACvD,IAAI,IAAI,CAAC,QAAA,GAAW,IAAI,GAAG,EAAE;AAC7B,IAAI,IAAI,CAAC,mBAAoB,GAAE,CAAC;;AAEhC;AACA,IAAI,IAAI,CAAC,SAAA,GAAY,WAAW,CAAC,MAAM,IAAI,CAAC,MAAM,EAAE,EAAE,sBAAsB,CAAE;AAC9E;AACA,IAAI,IAAI,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE;AAC9B;AACA,MAAM,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE;AAC5B;;AAEA,IAAI,IAAI,CAAC,WAAA,GAAc,IAAI,CAAC,KAAK,CAAC,CAAC,IAAI,CAAC,MAAM,EAAG,GAAE,sBAAsB,IAAI,IAAI,CAAC;AAClF,IAAI,IAAI,CAAC,WAAY,GAAE,KAAK;AAC5B;;AAEA;AACA;AACA;AACA,GAAS,GAAG;AACZ,IAAI,UAAU;AACd,IAAI,eAAe;AACnB,IAAI,KAAK;AACT,IAAI,eAAe,GAAoB,MAAM;AAC7C,IAAI,eAAe,GAA8B,EAAE;AACnD,IAAI,mBAAoB,GAAE,kBAAkB,EAAE;AAC9C,IAAU;AACV,IAAI,MAAM,YAAY,IAAI,CAAC,KAAK,CAAC,mBAAmB,CAAC;AACrD,IAAI,MAAM,IAAK,GAAE,iBAAiB,CAAC,eAAe,CAAC;AACnD,IAAI,MAAM,IAAK,GAAE,YAAY,CAAC,eAAe,CAAC;AAC9C,IAAI,MAAM,IAAK,GAAE,YAAY,CAAC,iBAA0B;;AAExD,IAAI,MAAM,SAAA,GAAY,YAAY,CAAC,UAAU,EAAE,IAAI,EAAE,IAAI,EAAE,IAAI,CAAC;;AAEhE,IAAI,IAAI,UAAW,GAAE,IAAI,CAAC,QAAQ,CAAC,GAAG,CAAC,SAAS,CAAC;AACjD;AACA,IAAI,MAAM,cAAA,GAAiB,UAAA,IAAc,UAAW,KAAI,eAAgB,GAAE,UAAU,CAAC,MAAM,CAAC,MAAA,GAAS,CAAC;;AAEtG,IAAI,IAAI,UAAU,EAAE;AACpB,MAAM,UAAU,CAAC,MAAM,CAAC,GAAG,CAAC,KAAK,CAAC;AAClC;AACA,MAAM,IAAI,UAAU,CAAC,SAAU,GAAE,SAAS,EAAE;AAC5C,QAAQ,UAAU,CAAC,SAAU,GAAE,SAAS;AACxC;AACA,WAAW;AACX,MAAM,aAAa;AACnB;AACA,QAAQ,MAAM,EAAE,IAAI,UAAU,CAAC,UAAU,CAAC,CAAC,KAAK,CAAC;AACjD,QAAQ,SAAS;AACjB,QAAQ,UAAU;AAClB,QAAQ,IAAI;AACZ,QAAQ,IAAI;AACZ,QAAQ,IAAI;AACZ,OAAO;AACP,MAAM,IAAI,CAAC,QAAQ,CAAC,GAAG,CAAC,SAAS,EAAE,UAAU,CAAC;AAC9C;;AAEA;AACA,IAAI,MAAM,GAAI,GAAE,OAAO,KAAA,KAAU,QAAS,GAAE,UAAU,CAAC,MAAM,CAAC,MAAA,GAAS,cAAA,GAAiB,KAAK;AAC7F,IAAI,+BAA+B,CAAC,UAAU,EAAE,IAAI,EAAE,GAAG,EAAE,IAAI,EAAE,eAAe,EAAE,SAAS,CAAC;;AAE5F;AACA;AACA,IAAI,IAAI,CAAC,mBAAoB,IAAG,UAAU,CAAC,MAAM,CAAC,MAAM;;AAExD,IAAI,IAAI,IAAI,CAAC,mBAAoB,IAAG,UAAU,EAAE;AAChD,MAAM,IAAI,CAAC,KAAK,EAAE;AAClB;AACA;;AAEA;AACA;AACA;AACA,GAAS,KAAK,GAAS;AACvB,IAAI,IAAI,CAAC,WAAY,GAAE,IAAI;AAC3B,IAAI,IAAI,CAAC,MAAM,EAAE;AACjB;;AAEA;AACA;AACA;AACA,GAAS,KAAK,GAAS;AACvB,IAAI,IAAI,CAAC,WAAY,GAAE,IAAI;AAC3B,IAAI,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC;AACjC,IAAI,IAAI,CAAC,MAAM,EAAE;AACjB;;AAEA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA,GAAU,MAAM,GAAS;AACzB;AACA;AACA;;AAEA;AACA;AACA,IAAI,IAAI,IAAI,CAAC,WAAW,EAAE;AAC1B,MAAM,IAAI,CAAC,WAAY,GAAE,KAAK;AAC9B,MAAM,IAAI,CAAC,mBAAoB,GAAE,CAAC;AAClC,MAAM,IAAI,CAAC,eAAe,CAAC,IAAI,CAAC,QAAQ,CAAC;AACzC,MAAM,IAAI,CAAC,QAAQ,CAAC,KAAK,EAAE;AAC3B,MAAM;AACN;AACA,IAAI,MAAM,aAAc,GAAE,IAAI,CAAC,KAAK,CAAC,kBAAkB,EAAE,CAAA,GAAI,sBAAuB,GAAE,OAAO,IAAI,CAAC,WAAW;AAC7G;AACA;AACA,IAAI,MAAM,cAAc,GAAiB,IAAI,GAAG,EAAE;AAClD,IAAI,KAAK,MAAM,CAAC,GAAG,EAAE,MAAM,CAAA,IAAK,IAAI,CAAC,QAAQ,EAAE;AAC/C,MAAM,IAAI,MAAM,CAAC,SAAU,IAAG,aAAa,EAAE;AAC7C,QAAQ,cAAc,CAAC,GAAG,CAAC,GAAG,EAAE,MAAM,CAAC;AACvC,QAAQ,IAAI,CAAC,mBAAoB,IAAG,MAAM,CAAC,MAAM,CAAC,MAAM;AACxD;AACA;;AAEA,IAAI,KAAK,MAAM,CAAC,GAAG,CAAE,IAAG,cAAc,EAAE;AACxC,MAAM,IAAI,CAAC,QAAQ,CAAC,MAAM,CAAC,GAAG,CAAC;AAC/B;;AAEA,IAAI,IAAI,CAAC,eAAe,CAAC,cAAc,CAAC;AACxC;;AAEA;AACA;AACA;AACA;AACA,GAAU,eAAe,CAAC,cAAc,EAAsB;AAC9D,IAAI,IAAI,cAAc,CAAC,IAAK,GAAE,CAAC,EAAE;AACjC;AACA;AACA,MAAM,MAAM,UAAU,KAAK,CAAC,IAAI,CAAC,cAAc,CAAC,CAAC,GAAG,CAAC,CAAC,GAAG,UAAU,CAAC,KAAK,UAAU,CAAC;AACpF,MAAM,uBAAuB,CAAC,IAAI,CAAC,OAAO,EAAE,OAAO,CAAC;AACpD;AACA;AACA;;;;"}