{"version":3,"file":"BatchSpanProcessorBase.js","sourceRoot":"","sources":["../../../src/export/BatchSpanProcessorBase.ts"],"names":[],"mappings":";AAAA;;;;;;;;;;;;;;GAcG;;;AAEH,4CAAwE;AACxE,8CAO6B;AAO7B;;;GAGG;AACH,MAAsB,sBAAsB;IAc1C,YACmB,SAAuB,EACxC,MAAU;QADO,cAAS,GAAT,SAAS,CAAc;QAPlC,iBAAY,GAAG,KAAK,CAAC;QACrB,mBAAc,GAAmB,EAAE,CAAC;QAGpC,uBAAkB,GAAW,CAAC,CAAC;QAMrC,MAAM,GAAG,GAAG,IAAA,aAAM,GAAE,CAAC;QACrB,IAAI,CAAC,mBAAmB;YACtB,OAAO,CAAA,MAAM,aAAN,MAAM,uBAAN,MAAM,CAAE,kBAAkB,CAAA,KAAK,QAAQ;gBAC5C,CAAC,CAAC,MAAM,CAAC,kBAAkB;gBAC3B,CAAC,CAAC,GAAG,CAAC,8BAA8B,CAAC;QACzC,IAAI,CAAC,aAAa;YAChB,OAAO,CAAA,MAAM,aAAN,MAAM,uBAAN,MAAM,CAAE,YAAY,CAAA,KAAK,QAAQ;gBACtC,CAAC,CAAC,MAAM,CAAC,YAAY;gBACrB,CAAC,CAAC,GAAG,CAAC,uBAAuB,CAAC;QAClC,IAAI,CAAC,qBAAqB;YACxB,OAAO,CAAA,MAAM,aAAN,MAAM,uBAAN,MAAM,CAAE,oBAAoB,CAAA,KAAK,QAAQ;gBAC9C,CAAC,CAAC,MAAM,CAAC,oBAAoB;gBAC7B,CAAC,CAAC,GAAG,CAAC,uBAAuB,CAAC;QAClC,IAAI,CAAC,oBAAoB;YACvB,OAAO,CAAA,MAAM,aAAN,MAAM,uBAAN,MAAM,CAAE,mBAAmB,CAAA,KAAK,QAAQ;gBAC7C,CAAC,CAAC,MAAM,CAAC,mBAAmB;gBAC5B,CAAC,CAAC,GAAG,CAAC,uBAAuB,CAAC;QAElC,IAAI,CAAC,aAAa,GAAG,IAAI,qBAAc,CAAC,IAAI,CAAC,SAAS,EAAE,IAAI,CAAC,CAAC;QAE9D,IAAI,IAAI,CAAC,mBAAmB,GAAG,IAAI,CAAC,aAAa,EAAE;YACjD,UAAI,CAAC,IAAI,CACP,mIAAmI,CACpI,CAAC;YACF,IAAI,CAAC,mBAAmB,GAAG,IAAI,CAAC,aAAa,CAAC;SAC/C;IACH,CAAC;IAED,UAAU;QACR,IAAI,IAAI,CAAC,aAAa,CAAC,QAAQ,EAAE;YAC/B,OAAO,IAAI,CAAC,aAAa,CAAC,OAAO,CAAC;SACnC;QACD,OAAO,IAAI,CAAC,SAAS,EAAE,CAAC;IAC1B,CAAC;IAED,gBAAgB;IAChB,OAAO,CAAC,KAAW,EAAE,cAAuB,IAAS,CAAC;IAEtD,KAAK,CAAC,IAAkB;QACtB,IAAI,IAAI,CAAC,aAAa,CAAC,QAAQ,EAAE;YAC/B,OAAO;SACR;QAED,IAAI,CAAC,IAAI,CAAC,WAAW,EAAE,CAAC,UAAU,GAAG,gBAAU,CAAC,OAAO,CAAC,KAAK,CAAC,EAAE;YAC9D,OAAO;SACR;QAED,IAAI,CAAC,YAAY,CAAC,IAAI,CAAC,CAAC;IAC1B,CAAC;IAED,QAAQ;QACN,OAAO,IAAI,CAAC,aAAa,CAAC,IAAI,EAAE,CAAC;IACnC,CAAC;IAEO,SAAS;QACf,OAAO,OAAO,CAAC,OAAO,EAAE;aACrB,IAAI,CAAC,GAAG,EAAE;YACT,OAAO,IAAI,CAAC,UAAU,EAAE,CAAC;QAC3B,CAAC,CAAC;aACD,IAAI,CAAC,GAAG,EAAE;YACT,OAAO,IAAI,CAAC,SAAS,EAAE,CAAC;QAC1B,CAAC,CAAC;aACD,IAAI,CAAC,GAAG,EAAE;YACT,OAAO,IAAI,CAAC,SAAS,CAAC,QAAQ,EAAE,CAAC;QACnC,CAAC,CAAC,CAAC;IACP,CAAC;IAED,gCAAgC;IACxB,YAAY,CAAC,IAAkB;QACrC,IAAI,IAAI,CAAC,cAAc,CAAC,MAAM,IAAI,IAAI,CAAC,aAAa,EAAE;YACpD,2BAA2B;YAE3B,IAAI,IAAI,CAAC,kBAAkB,KAAK,CAAC,EAAE;gBACjC,UAAI,CAAC,KAAK,CAAC,sCAAsC,CAAC,CAAC;aACpD;YACD,IAAI,CAAC,kBAAkB,EAAE,CAAC;YAE1B,OAAO;SACR;QAED,IAAI,IAAI,CAAC,kBAAkB,GAAG,CAAC,EAAE;YAC/B,gEAAgE;YAChE,UAAI,CAAC,IAAI,CACP,WAAW,IAAI,CAAC,kBAAkB,qCAAqC,CACxE,CAAC;YACF,IAAI,CAAC,kBAAkB,GAAG,CAAC,CAAC;SAC7B;QAED,IAAI,CAAC,cAAc,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;QAC/B,IAAI,CAAC,gBAAgB,EAAE,CAAC;IAC1B,CAAC;IAED;;;;SAIK;IACG,SAAS;QACf,OAAO,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YACrC,MAAM,QAAQ,GAAG,EAAE,CAAC;YACpB,8BAA8B;YAC9B,MAAM,KAAK,GAAG,IAAI,CAAC,IAAI,CACrB,IAAI,CAAC,cAAc,CAAC,MAAM,GAAG,IAAI,CAAC,mBAAmB,CACtD,CAAC;YACF,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,KAAK,EAAE,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE,EAAE;gBACrC,QAAQ,CAAC,IAAI,CAAC,IAAI,CAAC,cAAc,EAAE,CAAC,CAAC;aACtC;YACD,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC;iBAClB,IAAI,CAAC,GAAG,EAAE;gBACT,OAAO,EAAE,CAAC;YACZ,CAAC,CAAC;iBACD,KAAK,CAAC,MAAM,CAAC,CAAC;QACnB,CAAC,CAAC,CAAC;IACL,CAAC;IAEO,cAAc;QACpB,IAAI,CAAC,WAAW,EAAE,CAAC;QACnB,IAAI,IAAI,CAAC,cAAc,CAAC,MAAM,KAAK,CAAC,EAAE;YACpC,OAAO,OAAO,CAAC,OAAO,EAAE,CAAC;SAC1B;QACD,OAAO,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YACrC,MAAM,KAAK,GAAG,UAAU,CAAC,GAAG,EAAE;gBAC5B,mEAAmE;gBACnE,MAAM,CAAC,IAAI,KAAK,CAAC,SAAS,CAAC,CAAC,CAAC;YAC/B,CAAC,EAAE,IAAI,CAAC,oBAAoB,CAAC,CAAC;YAC9B,0DAA0D;YAC1D,aAAO,CAAC,IAAI,CAAC,IAAA,sBAAe,EAAC,aAAO,CAAC,MAAM,EAAE,CAAC,EAAE,GAAG,EAAE;gBACnD,yFAAyF;gBACzF,8EAA8E;gBAC9E,0CAA0C;gBAC1C,IAAI,KAAqB,CAAC;gBAC1B,IAAI,IAAI,CAAC,cAAc,CAAC,MAAM,IAAI,IAAI,CAAC,mBAAmB,EAAE;oBAC1D,KAAK,GAAG,IAAI,CAAC,cAAc,CAAC;oBAC5B,IAAI,CAAC,cAAc,GAAG,EAAE,CAAC;iBAC1B;qBAAM;oBACL,KAAK,GAAG,IAAI,CAAC,cAAc,CAAC,MAAM,CAAC,CAAC,EAAE,IAAI,CAAC,mBAAmB,CAAC,CAAC;iBACjE;gBAED,MAAM,QAAQ,GAAG,GAAG,EAAE,CACpB,IAAI,CAAC,SAAS,CAAC,MAAM,CAAC,KAAK,EAAE,MAAM,CAAC,EAAE;;oBACpC,YAAY,CAAC,KAAK,CAAC,CAAC;oBACpB,IAAI,MAAM,CAAC,IAAI,KAAK,uBAAgB,CAAC,OAAO,EAAE;wBAC5C,OAAO,EAAE,CAAC;qBACX;yBAAM;wBACL,MAAM,CACJ,MAAA,MAAM,CAAC,KAAK,mCACV,IAAI,KAAK,CAAC,wCAAwC,CAAC,CACtD,CAAC;qBACH;gBACH,CAAC,CAAC,CAAC;gBAEL,IAAI,gBAAgB,GAAgC,IAAI,CAAC;gBACzD,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,GAAG,GAAG,KAAK,CAAC,MAAM,EAAE,CAAC,GAAG,GAAG,EAAE,CAAC,EAAE,EAAE;oBAChD,MAAM,IAAI,GAAG,KAAK,CAAC,CAAC,CAAC,CAAC;oBACtB,IACE,IAAI,CAAC,QAAQ,CAAC,sBAAsB;wBACpC,IAAI,CAAC,QAAQ,CAAC,sBAAsB,EACpC;wBACA,gBAAgB,aAAhB,gBAAgB,cAAhB,gBAAgB,IAAhB,gBAAgB,GAAK,EAAE,EAAC;wBACxB,gBAAgB,CAAC,IAAI,CAAC,IAAI,CAAC,QAAQ,CAAC,sBAAsB,EAAE,CAAC,CAAC;qBAC/D;iBACF;gBAED,sFAAsF;gBACtF,IAAI,gBAAgB,KAAK,IAAI,EAAE;oBAC7B,QAAQ,EAAE,CAAC;iBACZ;qBAAM;oBACL,OAAO,CAAC,GAAG,CAAC,gBAAgB,CAAC,CAAC,IAAI,CAAC,QAAQ,EAAE,GAAG,CAAC,EAAE;wBACjD,IAAA,yBAAkB,EAAC,GAAG,CAAC,CAAC;wBACxB,MAAM,CAAC,GAAG,CAAC,CAAC;oBACd,CAAC,CAAC,CAAC;iBACJ;YACH,CAAC,CAAC,CAAC;QACL,CAAC,CAAC,CAAC;IACL,CAAC;IAEO,gBAAgB;QACtB,IAAI,IAAI,CAAC,YAAY;YAAE,OAAO;QAC9B,MAAM,KAAK,GAAG,GAAG,EAAE;YACjB,IAAI,CAAC,YAAY,GAAG,IAAI,CAAC;YACzB,IAAI,CAAC,cAAc,EAAE;iBAClB,OAAO,CAAC,GAAG,EAAE;gBACZ,IAAI,CAAC,YAAY,GAAG,KAAK,CAAC;gBAC1B,IAAI,IAAI,CAAC,cAAc,CAAC,MAAM,GAAG,CAAC,EAAE;oBAClC,IAAI,CAAC,WAAW,EAAE,CAAC;oBACnB,IAAI,CAAC,gBAAgB,EAAE,CAAC;iBACzB;YACH,CAAC,CAAC;iBACD,KAAK,CAAC,CAAC,CAAC,EAAE;gBACT,IAAI,CAAC,YAAY,GAAG,KAAK,CAAC;gBAC1B,IAAA,yBAAkB,EAAC,CAAC,CAAC,CAAC;YACxB,CAAC,CAAC,CAAC;QACP,CAAC,CAAC;QACF,6DAA6D;QAC7D,IAAI,IAAI,CAAC,cAAc,CAAC,MAAM,IAAI,IAAI,CAAC,mBAAmB,EAAE;YAC1D,OAAO,KAAK,EAAE,CAAC;SAChB;QACD,IAAI,IAAI,CAAC,MAAM,KAAK,SAAS;YAAE,OAAO;QACtC,IAAI,CAAC,MAAM,GAAG,UAAU,CAAC,GAAG,EAAE,CAAC,KAAK,EAAE,EAAE,IAAI,CAAC,qBAAqB,CAAC,CAAC;QACpE,IAAA,iBAAU,EAAC,IAAI,CAAC,MAAM,CAAC,CAAC;IAC1B,CAAC;IAEO,WAAW;QACjB,IAAI,IAAI,CAAC,MAAM,KAAK,SAAS,EAAE;YAC7B,YAAY,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;YAC1B,IAAI,CAAC,MAAM,GAAG,SAAS,CAAC;SACzB;IACH,CAAC;CAGF;AApOD,wDAoOC","sourcesContent":["/*\n * Copyright The OpenTelemetry Authors\n *\n * Licensed under the Apache License, Version 2.0 (the \"License\");\n * you may not use this file except in compliance with the License.\n * You may obtain a copy of the License at\n *\n * https://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nimport { context, Context, diag, TraceFlags } from '@opentelemetry/api';\nimport {\n BindOnceFuture,\n ExportResultCode,\n getEnv,\n globalErrorHandler,\n suppressTracing,\n unrefTimer,\n} from '@opentelemetry/core';\nimport { Span } from '../Span';\nimport { SpanProcessor } from '../SpanProcessor';\nimport { BufferConfig } from '../types';\nimport { ReadableSpan } from './ReadableSpan';\nimport { SpanExporter } from './SpanExporter';\n\n/**\n * Implementation of the {@link SpanProcessor} that batches spans exported by\n * the SDK then pushes them to the exporter pipeline.\n */\nexport abstract class BatchSpanProcessorBase\n implements SpanProcessor\n{\n private readonly _maxExportBatchSize: number;\n private readonly _maxQueueSize: number;\n private readonly _scheduledDelayMillis: number;\n private readonly _exportTimeoutMillis: number;\n\n private _isExporting = false;\n private _finishedSpans: ReadableSpan[] = [];\n private _timer: NodeJS.Timeout | undefined;\n private _shutdownOnce: BindOnceFuture;\n private _droppedSpansCount: number = 0;\n\n constructor(\n private readonly _exporter: SpanExporter,\n config?: T\n ) {\n const env = getEnv();\n this._maxExportBatchSize =\n typeof config?.maxExportBatchSize === 'number'\n ? config.maxExportBatchSize\n : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE;\n this._maxQueueSize =\n typeof config?.maxQueueSize === 'number'\n ? config.maxQueueSize\n : env.OTEL_BSP_MAX_QUEUE_SIZE;\n this._scheduledDelayMillis =\n typeof config?.scheduledDelayMillis === 'number'\n ? config.scheduledDelayMillis\n : env.OTEL_BSP_SCHEDULE_DELAY;\n this._exportTimeoutMillis =\n typeof config?.exportTimeoutMillis === 'number'\n ? config.exportTimeoutMillis\n : env.OTEL_BSP_EXPORT_TIMEOUT;\n\n this._shutdownOnce = new BindOnceFuture(this._shutdown, this);\n\n if (this._maxExportBatchSize > this._maxQueueSize) {\n diag.warn(\n 'BatchSpanProcessor: maxExportBatchSize must be smaller or equal to maxQueueSize, setting maxExportBatchSize to match maxQueueSize'\n );\n this._maxExportBatchSize = this._maxQueueSize;\n }\n }\n\n forceFlush(): Promise {\n if (this._shutdownOnce.isCalled) {\n return this._shutdownOnce.promise;\n }\n return this._flushAll();\n }\n\n // does nothing.\n onStart(_span: Span, _parentContext: Context): void {}\n\n onEnd(span: ReadableSpan): void {\n if (this._shutdownOnce.isCalled) {\n return;\n }\n\n if ((span.spanContext().traceFlags & TraceFlags.SAMPLED) === 0) {\n return;\n }\n\n this._addToBuffer(span);\n }\n\n shutdown(): Promise {\n return this._shutdownOnce.call();\n }\n\n private _shutdown() {\n return Promise.resolve()\n .then(() => {\n return this.onShutdown();\n })\n .then(() => {\n return this._flushAll();\n })\n .then(() => {\n return this._exporter.shutdown();\n });\n }\n\n /** Add a span in the buffer. */\n private _addToBuffer(span: ReadableSpan) {\n if (this._finishedSpans.length >= this._maxQueueSize) {\n // limit reached, drop span\n\n if (this._droppedSpansCount === 0) {\n diag.debug('maxQueueSize reached, dropping spans');\n }\n this._droppedSpansCount++;\n\n return;\n }\n\n if (this._droppedSpansCount > 0) {\n // some spans were dropped, log once with count of spans dropped\n diag.warn(\n `Dropped ${this._droppedSpansCount} spans because maxQueueSize reached`\n );\n this._droppedSpansCount = 0;\n }\n\n this._finishedSpans.push(span);\n this._maybeStartTimer();\n }\n\n /**\n * Send all spans to the exporter respecting the batch size limit\n * This function is used only on forceFlush or shutdown,\n * for all other cases _flush should be used\n * */\n private _flushAll(): Promise {\n return new Promise((resolve, reject) => {\n const promises = [];\n // calculate number of batches\n const count = Math.ceil(\n this._finishedSpans.length / this._maxExportBatchSize\n );\n for (let i = 0, j = count; i < j; i++) {\n promises.push(this._flushOneBatch());\n }\n Promise.all(promises)\n .then(() => {\n resolve();\n })\n .catch(reject);\n });\n }\n\n private _flushOneBatch(): Promise {\n this._clearTimer();\n if (this._finishedSpans.length === 0) {\n return Promise.resolve();\n }\n return new Promise((resolve, reject) => {\n const timer = setTimeout(() => {\n // don't wait anymore for export, this way the next batch can start\n reject(new Error('Timeout'));\n }, this._exportTimeoutMillis);\n // prevent downstream exporter calls from generating spans\n context.with(suppressTracing(context.active()), () => {\n // Reset the finished spans buffer here because the next invocations of the _flush method\n // could pass the same finished spans to the exporter if the buffer is cleared\n // outside the execution of this callback.\n let spans: ReadableSpan[];\n if (this._finishedSpans.length <= this._maxExportBatchSize) {\n spans = this._finishedSpans;\n this._finishedSpans = [];\n } else {\n spans = this._finishedSpans.splice(0, this._maxExportBatchSize);\n }\n\n const doExport = () =>\n this._exporter.export(spans, result => {\n clearTimeout(timer);\n if (result.code === ExportResultCode.SUCCESS) {\n resolve();\n } else {\n reject(\n result.error ??\n new Error('BatchSpanProcessor: span export failed')\n );\n }\n });\n\n let pendingResources: Array> | null = null;\n for (let i = 0, len = spans.length; i < len; i++) {\n const span = spans[i];\n if (\n span.resource.asyncAttributesPending &&\n span.resource.waitForAsyncAttributes\n ) {\n pendingResources ??= [];\n pendingResources.push(span.resource.waitForAsyncAttributes());\n }\n }\n\n // Avoid scheduling a promise to make the behavior more predictable and easier to test\n if (pendingResources === null) {\n doExport();\n } else {\n Promise.all(pendingResources).then(doExport, err => {\n globalErrorHandler(err);\n reject(err);\n });\n }\n });\n });\n }\n\n private _maybeStartTimer() {\n if (this._isExporting) return;\n const flush = () => {\n this._isExporting = true;\n this._flushOneBatch()\n .finally(() => {\n this._isExporting = false;\n if (this._finishedSpans.length > 0) {\n this._clearTimer();\n this._maybeStartTimer();\n }\n })\n .catch(e => {\n this._isExporting = false;\n globalErrorHandler(e);\n });\n };\n // we only wait if the queue doesn't have enough elements yet\n if (this._finishedSpans.length >= this._maxExportBatchSize) {\n return flush();\n }\n if (this._timer !== undefined) return;\n this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);\n unrefTimer(this._timer);\n }\n\n private _clearTimer() {\n if (this._timer !== undefined) {\n clearTimeout(this._timer);\n this._timer = undefined;\n }\n }\n\n protected abstract onShutdown(): void;\n}\n"]}