import { isNodeJS } from "../../compat";
import { formatValueWithUnits } from "../../measurement/UnitFormatter";
import { fetchJson } from "../../net/fetch";
import { base64EncArr, BBOX_SIZE, FRAG_SIZE, serializeBoundingBox, serializeFragment } from "../../wgs/scene/FragmentData";
import * as dte from "../DtEventTypes";
import { base64DecToArr, base64DecodedLen } from "../encoding/base64";
import { blobToJson } from "../encoding/utf8";
import { ELEMENT_CLONE_OVERLAY } from "../facets/FacetVizEffects";
import { ColumnFamilies, ElementFlags, ModelIdSize, QC, RowKeySize } from "../schema/dt-schema";
import { DtConstants } from "../schema/DtConstants";
import { DataVizControls } from "./DataVizControls";
import { StreamHeatmap } from "./StreamHeatmap";
import { StreamMarkers } from "./StreamMarkers";
import { StreamUtils } from "./StreamUtils";
import ReadingsByDate from "./ReadingsByDate";
import { TelemetryView } from "./timeline/TelemetryView";

const { StreamsImportExportTableHeader, AttributeType, ChangeTypes } = DtConstants;

// Effectively, types of data we support in our time-series workflows. Everything will be normalized to floats under the hood though
const SUPPORTED_ATTR_TYPES = { [AttributeType.Integer]: true, [AttributeType.Float]: true, [AttributeType.Double]: true };
const DEFAULT_STREAM_REFRESH_INTERVAL = 60 * 1000;

const CONNECTIONS_CATEGORY = DtConstants.RC['IoT Connections'];

// TODO: generalize (beyond position+normal) and move to more appropriate location
function createInterleavedGeom(bufferGeom) {

  const vertices = bufferGeom.attributes.position.array;
  const normals = bufferGeom.attributes.normal.array;
  const indices = bufferGeom.attributes.index.array;

  var geom = new THREE.BufferGeometry();

  var vb = new Float32Array(vertices.length + normals.length);

  geom.vbstride = 6;

  geom.vb = vb;
  geom.ib = indices;

  // fill interleaved vb
  var dst = 0;
  for (var i = 0; i < vertices.length; i += 3) {
    vb[dst++] = vertices[i];
    vb[dst++] = vertices[i + 1];
    vb[dst++] = vertices[i + 2];
    vb[dst++] = normals[i];
    vb[dst++] = normals[i + 1];
    vb[dst++] = normals[i + 2];
  }

  // position attribute
  var attrPos = new THREE.BufferAttribute(undefined, 3);
  attrPos.itemOffset = 0;
  geom.attributes.position = attrPos;

  // normal attribute
  var attrNormal = new THREE.BufferAttribute(undefined, 3);
  attrNormal.itemOffset = 3;
  geom.attributes.normal = attrNormal;

  // index attribute
  var attrIndex = new THREE.BufferAttribute(undefined, 1);
  attrIndex.bytesPerItem = 2;
  geom.attributes.index = attrIndex;

  geom.attributesKeys = Object.keys(geom.attributes);

  return geom;
}

// Ensures that network calls are not duplicated
function deduplicatedFetch(streamIDs, fetchCB, inflightCache) {
  const idsToRequest = streamIDs.filter((id) => !inflightCache.has(id));

  if (idsToRequest.length > 0) {
    // Set batched data retrieval.
    const request = fetchCB(idsToRequest).
    finally(() => idsToRequest.forEach((id) => inflightCache.delete(id)));
    // Update inflight cache
    idsToRequest.forEach((id) => inflightCache.set(id, request));
  }

  return Promise.all(streamIDs.map((id) => inflightCache.get(id)));
}

export class StreamManager {
  constructor(facility) {
    this.facility = facility;
    this.viewer = undefined;
    this.defaultModel = undefined;

    this.inflightCaches = {
      infos: new Map(),
      lastReadings: new Map()
    };
    this.lastReadingsCache = {};
    this.streamsCache = new Map();
    this.readingsByDate = new ReadingsByDate(this);
    this.view = new TelemetryView(this);
    this.boundOnModelChanged = this.onModelChanged.bind(this);
    this.boundOnModelMetaChanged = this.onModelMetaChanged.bind(this);

    this.refreshInterval = DEFAULT_STREAM_REFRESH_INTERVAL;
    this.refreshSubscribers = 0;

    this.heatmap = new StreamHeatmap(this, facility.eventTarget);
    if (!isNodeJS()) {
      // stream markers are relying on DOM API, so we exclude it from nodeJS version
      this.markers = new StreamMarkers(this, facility.eventTarget);

      this.dataVizToolbarCtrl = new DataVizControls(this, facility.eventTarget, this.heatmap, this.markers);
    }
  }

  // elementId of the root document (which grants access to the webhook endpoints)
  getRootElementID() {
    return 'AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA';
  }

  // Actual overlay and markers initialization only happen when a default model is found
  _initImpl() {
    this.streamsCache = new Map();
    this.view.init(this.viewer);
    this.dataVizToolbarCtrl?.init(this.viewer);
    this.heatmap.init(this.viewer);
    this.markers?.init(this.viewer);
    this.facility.eventTarget.addEventListener(dte.DT_MODEL_CHANGED_EVENT, this.boundOnModelChanged);
    this.facility.eventTarget.addEventListener(dte.DT_FACILITY_CHANGED_EVENT, this.boundOnModelChanged);
  }

  onModelMetaChanged(_ref) {let { change } = _ref;
    if (change.type === 'create' && change.model.urn() === this.facility.getDefaultModelId()) {
      this._initImpl();
      this.facility.eventTarget.removeEventListener(dte.DT_MODEL_METADATA_CHANGED_EVENT, this.boundOnModelMetaChanged);
    }
  }

  init(viewer) {
    this.viewer = viewer;

    // only run internal initialization if there's a default model
    const model = this.facility.getDefaultModel();
    if (model) {
      this._initImpl();
    } else {
      // defer actual init until a default model is created
      this.facility.eventTarget.addEventListener(dte.DT_MODEL_METADATA_CHANGED_EVENT, this.boundOnModelMetaChanged);
    }
  }

  dispose() {
    this.viewer = undefined;
    this.view.dispose();
    if (this.markers?.viewer) {
      // Clean up if overlay & markers initialization happened
      this.facility.facetsManager.facetsEffects.clearStreamsOverlay();
      this.dataVizToolbarCtrl?.dispose();
      this.heatmap.dispose();
      this.markers?.dispose();
      this.streamsCache.clear();
      this.facility.eventTarget.removeEventListener(dte.DT_MODEL_CHANGED_EVENT, this.boundOnModelChanged);
      this.facility.eventTarget.removeEventListener(dte.DT_FACILITY_CHANGED_EVENT, this.boundOnModelChanged);
    } else {
      // Remove listener waiting for default model creation
      this.facility.eventTarget.removeEventListener(dte.DT_MODEL_METADATA_CHANGED_EVENT, this.boundOnModelMetaChanged);
    }
  }

  async _getDefaultModel() {
    try {
      this.defaultModel = this.facility.getDefaultModel();

      if (!this.defaultModel) {
        return null;
      }

      await this.defaultModel.waitForLoad(false, true);
      return this.defaultModel;
    } catch (e) {
      console.warn('Unable to wait for default model', e);
      return null;
    }
  }

  async _getOrCreateDefaultModel() {
    await this._getDefaultModel();

    if (!this.defaultModel) {
      await this.facility.createDefaultModel();
      this.defaultModel = this.facility.getDefaultModel();
    }

    return this.defaultModel;
  }

  onHostHighlightChange(_ref2) {let { model, hostId, highlight } = _ref2;
    if (highlight) {
      this.facility.facetsManager.facetsEffects.addElementHighlight(model, hostId, ELEMENT_CLONE_OVERLAY);
    } else {
      this.facility.facetsManager.facetsEffects.removeElementHighlight(model, hostId, ELEMENT_CLONE_OVERLAY);
    }
  }

  async makeStreamGeometry() {
    if (!this.streamGeometryPromise) {
      this.streamGeometryPromise = (async () => {
        const streamGeom = createInterleavedGeom(new THREE.SphereBufferGeometry(0.5, 32, 24));
        const res = await this.defaultModel.createGeometry(streamGeom);
        return res;
      })();
    }
    return this.streamGeometryPromise;
  }

  async addStreamFragmentFromParent(element, model, dbId) {
    const box = model.getElementBounds(dbId);
    if (box.isEmpty()) {
      console.error('Failed to obtain bounding box of parent geometry');
      return;
    }
    const center = box.getCenter();
    center.add(model.getGlobalOffset());


    const { hash } = await this.makeStreamGeometry();
    // TODO: here we are ignoring the transform, as the geometry is the unit sphere.
    let m = new THREE.Matrix4();
    m.setPosition(center);
    const fragmentData = serializeFragment(0, hash, null, m.toArray());
    const lmvFragment = base64EncArr(fragmentData, 0, FRAG_SIZE);

    const bounds = new THREE.Box3();
    bounds.setFromCenterAndSize(center, new THREE.Vector3(1, 1, 1));
    const boundingBoxData = serializeBoundingBox(bounds);
    const lmvBBox = base64EncArr(boundingBoxData, 0, BBOX_SIZE);

    element[DtConstants.QC.LmvFragment] = lmvFragment;
    element[DtConstants.QC.LmvBoundingBox] = lmvBBox;
  }

  // create a logical stream element and link it to some physical asset - link information has to be provided all the time as we do not support creation free floating logical stream elements.
  async createStream(name, modelUrnOfParent, dbIdOfParent) {let extraProps = arguments.length > 3 && arguments[3] !== undefined ? arguments[3] : {};let wantGeometry = arguments.length > 4 && arguments[4] !== undefined ? arguments[4] : false;
    const modelOfParent = this.facility.getModelByUrn(modelUrnOfParent);

    const props = {
      [QC.Name]: name,
      [QC.CategoryId]: CONNECTIONS_CATEGORY,
      [QC.ElementFlags]: ElementFlags.Stream,
      [QC.UniformatClass]: "D7070",
      ...extraProps
    };

    if (!this.defaultModel) {
      await this._getOrCreateDefaultModel();
    }

    if (modelOfParent && dbIdOfParent) {
      await this.addParentXRef([props], [[modelUrnOfParent, dbIdOfParent]]);

      if (wantGeometry) {
        await this.addStreamFragmentFromParent(props, modelOfParent, dbIdOfParent);
      }
    }

    const [newElmKey] = await this.defaultModel.createElementsWithData([props], 'Create streams');

    // set stream ingestion password
    await this.resetStreamSecrets([newElmKey]);
    return newElmKey;
  }

  //  defines which columns will be available in the template - currently we base it of our default parameter set applied to every created stream + standard override property for name
  getStreamsBulkImportTemplate() {
    const template = { ...StreamsImportExportTableHeader };

    return [
    template,
    { "Name": "Stream 1", "Assembly Code": "D7070", "Classification": "", "HostModelID": "", "HostElementID": "" },
    { "Name": "Stream 2", "Assembly Code": "D7070", "Classification": "", "HostModelID": "", "HostElementID": "" }];

  }

  /**
   * Import streams from JSON data.
   * Supports updates to streams by providing the fully-qualified element ID and facility ID. Ignores unchanged streams.
   * Ignores empty values in the CSV, unsetting values is not supported
   * @param {Object} schema
   * @param {Object[]} data
   * @returns {string[]} created/modified stream ids
   */
  async importStreamsFromJsonTemplate(schema, data) {
    const streams = await this.getAllStreamInfos();
    const existingStreams = new Map(streams.map((s) => [s.fullId, s]));

    const defaultModel = await this._getOrCreateDefaultModel();
    const dbId2catId = defaultModel.getData().dbId2catId;

    const parents = {};


    for (let props of data) {
      if (!props.fullId) {
        continue;
      }
      const stream = existingStreams.get(props.fullId);

      if (!stream?.hostElement) {
        continue;
      }

      const [elementId] = await stream.hostElement.model.getElementIdsFromDbIds([stream.hostElement.hostId]);
      parents[stream.fullId] = { modelId: stream.hostElement.model.urn(), elementId };
    }

    const updatedStreams = [];
    const newStreams = [];

    for (let streamProps of data) {
      const res = {
        [QC.ElementFlags]: ElementFlags.Stream
      };

      const existingStream = existingStreams.get(streamProps?.fullId);

      // Compare with existing streams in the facility and filter out incorrect and unchanged
      if (streamProps?.fullId?.length) {
        if (!existingStream) {
          console.error(`Unknkown full id for connection "${streamProps.Name}", skipping...`);
          continue;
        }

        // Check that it is coming from the right facility
        if (!streamProps.facility?.length) {
          console.error(`Facility ID is missing for "${streamProps.Name}", skipping...`);
          continue;
        }

        if (streamProps.facility !== this.facility.urn()) {
          console.error(`Connection "${streamProps.Name}" seems to be coming from another facility. ${streamProps.facility} instead of ${this.facility.urn()}, skipping...`);
          continue;
        }

        // Skip streams where nothing changed
        const noNameChanges = !streamProps.Name || streamProps.Name === existingStream.name;
        const noClassificationChanges = !streamProps.Classification || streamProps.Classification === existingStream.userClassification;
        const noAssemblyCodeChanges = !streamProps["Assembly Code"] || streamProps["Assembly Code"] === existingStream.dtClass;
        const noHostModelChange = !streamProps.HostModelID || streamProps.HostModelID === existingStream.hostElement?.model.urn();
        const noHostElementChange = !streamProps.HostElementID || streamProps.HostElementID === parents[streamProps.fullId]?.elementId;

        if (noNameChanges && noClassificationChanges && noAssemblyCodeChanges && noHostModelChange && noHostElementChange) {
          continue;
        }

        // Fill in missing categoryId for older streams
        if (dbId2catId[existingStream.dbId] !== CONNECTIONS_CATEGORY) {
          res[QC.CategoryId] = CONNECTIONS_CATEGORY;
        }
        res[QC.LmvDbId] = existingStream.dbId;
        updatedStreams.push(res);
      } else {
        // Add default values for new connections
        res[QC.CategoryId] = CONNECTIONS_CATEGORY;
        res[QC.UniformatClass] = "D7070";
        res[QC.Classification] = "";
        newStreams.push(res);
      }

      const updateLink = {};
      for (let prop in StreamsImportExportTableHeader) {
        if (!streamProps[prop]) {
          // property is not set for the stream
          continue;
        }

        switch (schema[prop]) {
          case StreamsImportExportTableHeader.Name:
            if (!existingStream) {
              res[QC.Name] = streamProps[prop];
            } else if (existingStream.name !== streamProps[prop]) {
              res[QC.OName] = streamProps[prop];
            }
            continue;

          case StreamsImportExportTableHeader["Assembly Code"]:
            if (!existingStream) {
              res[QC.UniformatClass] = streamProps[prop];
            } else if (existingStream.dtClass !== streamProps[prop]) {
              res[QC.OUniformatClass] = streamProps[prop];
            }
            continue;

          case StreamsImportExportTableHeader.Classification:
            if (!existingStream) {
              res[QC.Classification] = streamProps[prop];
            } else if (existingStream.userClassification !== streamProps[prop]) {
              res[QC.OClassification] = streamProps[prop];
            }
            continue;
          case StreamsImportExportTableHeader.HostElementID:
          case StreamsImportExportTableHeader.HostModelID:
            if (streamProps[prop]) {
              updateLink[prop] = streamProps[prop];
            }
            continue;
          default:
            res[ColumnFamilies.DtProperties + ":" + schema[prop]] = streamProps[prop];
        }
      }

      if (updateLink.HostElementID && updateLink.HostModelID) {
        if (
        parents[res.k]?.modelId !== updateLink.HostModelID ||
        parents[res.k]?.elementId !== updateLink.HostElementID)
        {
          // we know the update contains a new link, can convert it to element properties now
          const targetModel = this.facility.getModelByUrn(updateLink.HostModelID);
          if (!targetModel) {
            console.error(`Failed to find target model ${modelId}, skipping the link creation`);
            props.push(res);
            continue;
          }
          const [targetDbId] = await targetModel.getDbIdsFromElementIds([updateLink.HostElementID]);

          if (!targetDbId) {
            if (!targetModel) {
              console.error(`Failed to find target dbId for ${elementId}, skipping the link creation`);
              props.push(res);
              continue;
            }
          }
          await this.addParentXRef([res], [[updateLink.HostModelID, targetDbId]]);
        }
      }
    }

    // No changes detected
    if (!updatedStreams.length && !newStreams.length) {
      console.log('Import finished without changes to connections');
      return [];
    }

    if (!this.defaultModel) {
      await this._getOrCreateDefaultModel();
    }

    const [newKeys, _] = await Promise.all([
    newStreams.length ? this.defaultModel.createElementsWithData(newStreams, 'Create streams') : [],
    updatedStreams.length ? this.defaultModel.updateElementsWithData(updatedStreams, 'Update streams') : []]
    );

    if (newKeys.length) {
      // for new streams set streams secrets
      await this.resetStreamSecrets(newKeys);
    }

    console.log(`Connections import result: ${newKeys.length} new, ${updatedStreams.length} modified, ${data.length - updatedStreams.length - newKeys.length} skipped connections.`);

    return newKeys;
  }

  /**
   * Creates table-like JSON object from given streams
   * @param {Stream[]} [streams] if provided will export only given streams
   * @returns {Promise<*[]>}
   */
  async exportStreamsToJson() {let streams = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : [];
    const defaultModel = this.facility.getDefaultModel();

    if (!streams?.length) {
      console.warn('streams are missing');
      return [StreamsImportExportTableHeader];
    }

    const keys = streams.map((s) => s.fullId);

    const secrets = await fetchJson(this.facility.loadContext, `/models/${defaultModel.urn()}/getstreamssecrets`, "POST", { keys });

    const res = await Promise.all(streams.map(async (s) => {
      const { name, fullId, streamUrlRaw, dtClass, userClassification } = s;
      const ingestionSecret = secrets[fullId];
      if (!ingestionSecret) {
        console.error(`secret is missing for stream ${fullId}`);
      }

      const ingestionUrl = this._makeIngestionUrl(streamUrlRaw, ingestionSecret);
      let elmId;

      if (s.hostElement) {
        [elmId] = await s?.hostElement.model.getElementIdsFromDbIds([s.hostElement.hostId]);
      }
      //  this is the list of data that is actually exported
      return {
        Name: name,
        "Assembly Code": dtClass,
        Classification: userClassification,
        facility: this.facility.urn(),
        fullId,
        ingestionUrl,
        HostElementID: elmId,
        HostModelID: s.hostElement?.model.urn()
      };

    }));

    return [StreamsImportExportTableHeader, ...res];
  }

  async updateLink(streams, hosts) {
    if (streams.length !== hosts.length) {
      console.error("Streams/Targets length mismatch");
      return;
    }

    if (streams.length < 1) {
      return; // no change
    }

    const changeData = streams.map((stream) => ({
      [QC.LmvDbId]: stream.dbId,
      [QC.XParent]: ""
    }));

    await this.addParentXRef(changeData, hosts);

    const defaultModel = this.facility.getDefaultModel();
    return await defaultModel.updateElementsWithData(changeData, "Update streams relationship");
  }

  validateThresholds(thresholds) {
    if (!thresholds) return true; // effectively removes thresholds
    const { lower, upper } = thresholds;
    const numbers = [lower?.alert, lower?.warn, upper?.warn, upper?.alert].filter(isFinite);

    if (numbers.length === 1) return true; // single threshold is always valid

    for (let n = 1; n < numbers.length; n++) {
      // Must be an array of growing numbers
      if (numbers[n - 1] >= numbers[n]) return false;
    }

    return true;
  }

  /**
   * Apply and remove thresholds for the given streams and parameter
   * @param {Streams[]} streams
   * @param {string} attrID
   * @param {{ name: string, lower: { alert: string, warn: string }, upper: { alert: string, warn: string }}} [newThresholds] omit for removing thresholds
   */
  async updateBulkThresholds(streams, attrID, newThresholds) {
    if (!streams?.length) {
      return; // no change
    }

    if (!this.validateThresholds(newThresholds)) {
      throw new Error(`Invalid threshold value(s) for attribute: ${attrID}`);
    }

    if (!streams.every((stream) => stream.streamAttrs.some((attr) => attr.id === attrID))) {
      throw new Error(`Attribute ${attrID} is not applied to some of the streams`);
    }

    const changeData = [];

    for (const stream of streams) {
      // Rebuild old thresholds because we want to keep streamAttrs.allowedValues.thresholds schema
      const thresholdsByAttrID = stream.streamAttrs.reduce((th, attr) => {
        th[attr.id] = attr.allowedValues?.thresholds;
        return th;
      }, {});

      thresholdsByAttrID[attrID] = newThresholds;

      let storedThresholds;
      for (const aID in thresholdsByAttrID) {
        // Skip undefined thresholds when building BLOB
        if (thresholdsByAttrID[aID]) {
          storedThresholds ??= {};
          storedThresholds[aID] = thresholdsByAttrID[aID];
        }
      }

      const settings = structuredClone(stream.settings || {});

      if (storedThresholds) {
        settings.thresholds = storedThresholds;
      } else {
        delete settings.thresholds;
      }

      const change = {
        [QC.LmvDbId]: stream.dbId,
        [QC.Settings]: Object.keys(settings).length > 0 ? settings : null
      };

      changeData.push(change);
    }

    const defaultModel = this.facility.getDefaultModel();
    await defaultModel.updateElementsWithData(changeData, "Update streams thresholds");

    // Update cached value
    const updatedStreams = [];

    for (const stream of streams) {
      const attr = stream.streamAttrs.find((attr) => attr.id === attrID);
      attr.allowedValues ??= {};
      attr.allowedValues.thresholds = newThresholds;
      updatedStreams.push(stream.dbId);
    }

    this.facility.eventTarget.dispatchEvent({ type: dte.DT_STREAMS_INFO_CHANGED_EVENT, dbIds: updatedStreams });
  }

  /**
   * @returns {Promise<{ name: string, lower: { alert: string, warn: string }, upper: { alert: string, warn: string }, streams: Stream[], attribute: Attr}[]>} - Compiled list of thresholds, augmented with streams and the attributes they target
   */
  async getThresholds() {
    const thresholds = new Map();

    const streams = await this.getAllStreamInfos();

    for (let stream of streams) {
      for (let attr of stream.streamAttrs) {
        if (attr.allowedValues?.thresholds) {
          const threshold = attr.allowedValues.thresholds;

          if (thresholds.has(threshold.name)) {

            if (thresholds.get(threshold.name).attribute.id === attr.id) {
              thresholds.get(threshold.name).streams.push(stream);
            } else {
              console.warn(`Wrong threshold-attribute assignment. Threshold: ${threshold.name} / Stream ID: ${stream.fullId} / Attribute: ${attr.id}`);
            }

            continue;
          }

          thresholds.set(threshold.name, { ...threshold, attribute: attr, streams: [stream] });
        }
      }
    }

    return Array.from(thresholds.values());
  }

  /**
   * Data Points for the given stream, can be from cache.
   *
   * As this function is often called simultaneously on an empty cache, it takes extra-precaution to ensure network
   * requests are not unnecessarily duplicated.
   * @param {number[]} streamdbIds Stream IDs
   * @param {boolean} wantAlerts if threshold states should be computed
   * @returns {Promise<*[]>}
   */
  async getLastReadings() {let streamdbIds = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : [];let wantAlerts = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : false;
    if (streamdbIds.length === 0) {
      return [];
    }

    const streamIDs = await this.facility.getDefaultModel().getElementIdsFromDbIds(streamdbIds);

    const idsNotCached = streamIDs.filter((id) => !this.lastReadingsCache.hasOwnProperty(id));
    if (idsNotCached.length > 0) {
      await deduplicatedFetch(idsNotCached, this._fetchLastReadingsAndUpdateCache.bind(this), this.inflightCaches.lastReadings);
    }

    let readingsByStreamIdx = streamIDs.map((id) => this.lastReadingsCache[id]);

    const formattedResults = [];
    for (let streamIdx in readingsByStreamIdx) {
      const dbId = streamdbIds[streamIdx];
      const stream = this.streamsCache.get(dbId);
      const readingsByAttrID = readingsByStreamIdx[streamIdx];
      if (wantAlerts && !readingsByAttrID) {
        formattedResults[streamIdx] = { state: DtConstants.StreamStates.NoData };
        continue;
      }
      // Build thresholdsByAttrID because we want to keep streamAttrs.allowedValues.thresholds schema
      const thresholdsByAttrID = stream?.streamAttrs.reduce((th, attr) => {
        th[attr.id] = attr.allowedValues?.thresholds;
        return th;
      }, {});

      // make a copy here to avoid modifying the cache entry.
      const enrichedData = {};
      for (let attrID in readingsByAttrID) {
        const [[ts, val]] = Object.entries(readingsByAttrID[attrID]);
        const newEntry = { ts: Number(ts), val };

        if (wantAlerts) {
          const thresholds = thresholdsByAttrID?.[attrID];
          newEntry.state = StreamUtils.evaluateState(thresholds, val, ts);
        }

        enrichedData[attrID] = newEntry;
      }
      formattedResults[streamIdx] = enrichedData;
    }

    return formattedResults;
  }

  async refreshStreamsLastReadings() {let streamdbIds = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : [];
    const streamIDs = streamdbIds.length ? await this.facility.getDefaultModel().getElementIdsFromDbIds(streamdbIds) : [];
    const streamsToFetch = streamIDs && streamIDs.length > 0 ? streamIDs : (await this.getAllStreamInfos()).map((stream) => stream.fullId);

    await this._fetchLastReadingsAndUpdateCache(streamsToFetch);
  }

  async _fetchLastReadingsAndUpdateCache() {let streamIDs = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : [];
    if (streamIDs.length === 0) {
      return;
    }

    const defaultModel = this.facility.getDefaultModel();

    let readings;
    try {
      readings = await fetchJson(
        this.facility.loadContext,
        `/timeseries/models/${defaultModel.urn()}/streams`,
        'POST',
        { keys: streamIDs });
    } catch (err) {
      console.log('Failed to fetch streams reading', err);
      return;
    }

    const updated = [];

    for (let key in readings) {
      const lastReading = this.lastReadingsCache[key];
      const newReading = readings[key];

      if (JSON.stringify(lastReading) !== JSON.stringify(newReading)) {
        this.lastReadingsCache[key] = readings[key];
        updated.push(key);
      }
    }

    if (updated.length > 0) {
      const dbIds = await defaultModel.getDbIdsFromElementIds(updated);
      this.facility.eventTarget.dispatchEvent({ type: dte.DT_STREAMS_LAST_READINGS_CHANGED_EVENT, keys: updated, dbIds });
    }
  }

  _makeIngestionUrl(rawStreamUrl, secret) {
    const url = new URL(this.facility.loadContext.endpoint);
    url.password = secret;
    url.pathname += rawStreamUrl;

    return url.href;
  }

  // returns an ingestion url for the given stream that allows user to push data without supplying forge JWT token
  async getStreamSecret(key) {
    const defaultModel = await this._getOrCreateDefaultModel();
    const res = await fetchJson(this.facility.loadContext, `/models/${defaultModel.urn()}/getstreamssecrets`, "POST", { keys: [key] });

    return res[key];
  }

  // unless hardReset is true, the reset will not replace existing secrets (so it can be used only to backfill empty ones)
  async resetStreamSecrets(keys) {let hardReset = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : false;
    const defaultModel = this.facility.getDefaultModel();
    return fetchJson(this.facility.loadContext, `/models/${defaultModel.urn()}/resetstreamssecrets`, "POST", {
      keys,
      hardReset
    });
  }

  // returns an ingestion url for the given stream that allows user to push data without supplying forge JWT token
  async getStreamIngestionUrl(stream) {
    const secret = await this.getStreamSecret(stream.fullId);
    return this._makeIngestionUrl(stream.streamUrlRaw, secret);
  }

  formatStreamValue(value, attribute) {
    if (attribute.forgeUnit) {
      return formatValueWithUnits(
        value,
        "autodesk.unit.unit:" + attribute.forgeUnit,
        3,
        attribute.precision || 1,
        { symbolKey: attribute.forgeSymbol }
      );
    }
    return value;
  }

  /**
   * Find attributes that can be used as "connected" for the given stream
   * @param {Object} stream stream object
   * @param {Object[]} [attrs] default model attrs, more efficient if possible in given context
   */
  async getAttrCandidates(stream, attrs) {
    const defaultModel = this.facility.getDefaultModel();
    if (!attrs) {
      attrs = await defaultModel.getAttributes({ native: true });
    }
    const props = await defaultModel.getPropertiesDt([stream.dbId]);
    const hasProps = new Set(Object.values(props[0].element.properties).map((_ref3) => {let { uuid } = _ref3;return uuid;}));
    const attrCandidates = attrs.filter((attr) => SUPPORTED_ATTR_TYPES[attr.dataType] && hasProps.has(attr.uuid));
    return attrCandidates;
  }

  async getAllConnectedAttributes() {
    const defaultModel = this.facility.getDefaultModel();
    if (!defaultModel) {
      return [];
    }

    await defaultModel.waitForLoad(false, true);

    const attrs = await defaultModel.getAttributes({ native: true });
    return attrs.filter((attr) => (attr.flags & DtConstants.AttributeFlags.afStream) !== 0);
  }

  async getAllStreamInfos() {let clearCache = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : false;
    const streamIds = await this.getStreamIds();
    const hadCache = !!this.streamsCache.size;

    if (clearCache) {
      this.streamsCache.clear();
      this.inflightCaches.infos.clear();
    }

    // All streams were removed
    if (streamIds.length === 0 && clearCache && hadCache) {
      this.facility.eventTarget.dispatchEvent({ type: dte.DT_STREAMS_INFO_CHANGED_EVENT, dbIds: [] });
      return [];
    }

    const missingFromCache = streamIds.filter((streamId) => !this.streamsCache.has(streamId));

    if (missingFromCache.length > 0) {
      await deduplicatedFetch(missingFromCache, this._fetchAllStreamInfoAndUpdateCache.bind(this), this.inflightCaches.infos);
    }

    return streamIds.
    map((streamId) => this.streamsCache.get(streamId))
    // Filter out undefined. Can happen when streamIds and cache are not in sync anymore due to other parallel events
    .filter(Boolean);
  }

  async _fetchAllStreamInfoAndUpdateCache() {let streamIDs = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : [];
    const defaultModel = this.facility.getDefaultModel();

    if (!defaultModel) {
      return [];
    }

    const dtPkg = defaultModel.getData();
    const parentXrefs = streamIDs.map((streamId) => dtPkg.xrefId2fullId[dtPkg.dbId2xparentId[streamId]]);

    const [fullIds, resolvedParentXRefs, props] = await Promise.all([
    defaultModel.getElementIdsFromDbIds(streamIDs),
    this.facility.resolveXrefs(parentXrefs),
    defaultModel.query({
      dbIds: streamIDs,
      includes: { standard: true, applied: true, element: true, type: false }
    })]
    );

    let updatedStreams = [];
    for (let idx = 0; idx < streamIDs.length; idx++) {
      const dbId = streamIDs[idx];
      const fullId = fullIds[idx];

      const row = props.rows[idx];

      const name = row[QC.Name];
      const dtClass = row[QC.OUniformatClass] || row[QC.UniformatClass];
      const userClassification = row[QC.OClassification] || row[QC.Classification];

      // to fetch raw data of the stream
      const streamUrlRaw = `/timeseries/models/${defaultModel.urn()}/streams/${fullId}`;

      // to fetch rolled up data of the stream - preferable option
      const streamUrl = `/timeseries/models/${defaultModel.urn()}/streams/${fullId}/rollups`;
      const candidates = await this.getAttrCandidates({ dbId }, props.cols);
      const streamAttrs = candidates.
      filter((attr) => (attr.flags & DtConstants.AttributeFlags.afStream) !== 0).
      map((attr) => {
        const _attr = structuredClone(attr);
        if (_attr.precision === 0) {
          // backend will fall back to "2" when precision is not set explictly
          // so we do the same here to make sure properties values formatting works as expected
          _attr.precision = 2;
        }

        // Remove legacy thresholds
        _attr.allowedValues ??= {};
        _attr.allowedValues.thresholds = null;
        return _attr;
      });

      const parent = resolvedParentXRefs[idx];
      if (parentXrefs[idx] && !parent) {
        console.warn(`Failed to resolve parent xref for stream "${name}" (${fullId})`);
      }

      let settings = null;
      if (row[QC.Settings]) {
        // Use per-element thresholds
        const enc = row[QC.Settings];
        const arr = new Uint8Array(base64DecodedLen(enc.length));
        base64DecToArr(enc, arr);
        settings = blobToJson(arr);

        if (settings?.thresholds) {
          // Overwrite all attribute thresholds if the thresholds exist in the settings row
          for (const attr of streamAttrs) {
            if (!attr) continue;
            attr.allowedValues.thresholds = settings.thresholds[attr.id]?.name ? settings.thresholds[attr.id] : null;
          }
        }
      }

      const info = {
        dbId,
        name,
        dtClass,
        userClassification,
        streamUrlRaw,
        streamUrl,
        fullStreamUrlRaw: this.facility.loadContext.endpoint + streamUrlRaw,
        fullStreamUrl: this.facility.loadContext.endpoint + streamUrl,
        fullId,
        hostElement: parent ? { model: parent.model, hostId: parent.dbId } : undefined,
        streamAttrs,
        settings
      };

      if (!this.streamsCache.has(dbId)) {
        this.streamsCache.set(dbId, info);
        updatedStreams.push(dbId);
      }
    }

    if (updatedStreams.length > 0)
    this.facility.eventTarget.dispatchEvent({ type: dte.DT_STREAMS_INFO_CHANGED_EVENT, dbIds: updatedStreams });
  }

  getAllStreamInfosFromCache() {
    return [...this.streamsCache.values()];
  }

  onModelChanged(e) {
    // Ignore stream data changes, as it doesn't affect the stream info
    if (e.change.ctype === ChangeTypes.DeleteStreamData) {
      return;
    }

    this.getAllStreamInfos(true);
  }

  // replicates the room references from a given element into elmData
  async addRoomFromElement(elmData, model, dbId) {
    const pkg = model.getData();
    const roomIds = pkg.dbId2roomIds[dbId];
    const xroomIds = pkg.dbId2xroomIds[dbId];

    let xrefs = [];
    if (Array.isArray(xroomIds)) {
      xrefs = xroomIds.map((xrefId) => pkg.xrefId2fullId[xrefId]);
    } else if (xroomIds) {
      xrefs.push(pkg.xrefId2fullId[xroomIds]);
    }

    if (roomIds) {
      const refIds = await model.getElementIdsFromDbIds(Array.isArray(roomIds) ? roomIds : [roomIds]);
      refIds.forEach((id) => xrefs.push([model.urn(), id]));
    }

    if (!xrefs.length) {
      return;
    }
    let buf = new Uint8Array(xrefs.length * RowKeySize);

    xrefs.forEach((_ref4, idx) => {let [urn, id] = _ref4;
      const modelGuid = urn.slice(DtConstants.DT_MODEL_URN_PREFIX.length + 1);
      base64DecToArr(modelGuid, buf, idx * RowKeySize);

      base64DecToArr(id, buf, idx * RowKeySize + ModelIdSize);
    });

    elmData[QC.XRooms] = base64EncArr(buf);
  }

  async addParentXRef(elements, linksTo) {
    //Convert link dbIds to database row IDs
    let allLinks = linksTo.reduce((acc, linkTo) => {
      if (linkTo) {
        let list = acc[linkTo[0]];
        if (!list) {
          acc[linkTo[0]] = list = [];
        }
        list.push(linkTo[1]);
      }
      return acc;
    }, {});

    let encodedIds = await this.facility.encodeIds(allLinks, true);

    for (let idx = 0; idx < elements.length; idx++) {
      const elmData = elements[idx];
      const linkTo = linksTo[idx];

      if (linkTo) {
        const [urn, dbId] = linkTo;
        const model = this.facility.getModelByUrn(urn);
        // XParent
        elmData[QC.XParent] = encodedIds[urn][dbId];

        // XRooms
        // here we only copy room references if the destination element has none of its own yet
        // TODO: let the caller and ultimately the user decide
        const elementId = elmData[QC.LmvDbId];
        const wantRoom = elementId === undefined || !this.defaultModel.getData().dbId2xroomIds[elementId]?.length;
        if (wantRoom) {
          await this.addRoomFromElement(elmData, model, dbId);
        }

        // Level
        const wantLevel = elementId === undefined || !this.defaultModel.getData().dbId2levelId[elementId];
        if (wantLevel) {
          const levelId = model.getData().dbId2levelId[dbId];
          if (levelId > 0) {
            await this.facility.addLevelFromElement(elmData, model, levelId);
          }
        }
      }
    }
  }

  /**
   * Deletes streams incl. timeseries data
   * @param {string[]} streamDbIds
   * @param {number} chunkSize batch size for deleting streams, avoids overloading the server in a single call
   */
  async deleteStreams(streamDbIds) {let chunkSize = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : 50;
    if (!streamDbIds?.length) {
      console.warn('stream ids are missing');
      return;
    }

    let defaultModel = this.facility.getDefaultModel();

    // delete stream elements
    await defaultModel.deleteElements(streamDbIds, 'stream-delete');
  }

  async getStreamIds() {
    const defaultModel = await this._getDefaultModel();

    if (!defaultModel) {
      return [];
    }

    let streamIds = [];
    const dbId2flags = defaultModel.getData().dbId2flags;
    for (let i = 0; i < dbId2flags.length; ++i) {
      if (dbId2flags[i] === ElementFlags.Stream) {
        streamIds.push(i);
      }
    }

    return streamIds;
  }

  // TODO: use dbIds here too
  async fetchBulkStreamRollups(streamIDs, model, begin, end, attrID) {
    return await fetchJson(this.facility.loadContext, `/timeseries/models/${model.urn()}/rollups?from=${begin}&to=${end}&substream=${attrID}`, 'POST', { keys: streamIDs });
  }

  setStreamRefreshInterval(refreshInterval) {
    this.refreshInterval = refreshInterval;
    if (this.refreshIntervalHandle) {
      clearInterval(this.refreshIntervalHandle);
      this.refreshIntervalHandle = setInterval(() => {
        this.refreshStreamsLastReadings();
      }, this.refreshInterval);
    }
  }

  async setOfflineTimeout(stream, offlineTimeoutInMinutes) {
    if (!stream) return;

    if (stream.offlineTimeoutInMinutes === offlineTimeoutInMinutes) return;

    if (typeof offlineTimeoutInMinutes !== 'number') {
      throw new Error(`offlineTimeoutInMinutes must be a numeric value`);
    }

    const settings = { ...(stream.settings || {}), offlineTimeoutInMinutes };

    const defaultModel = this.facility.getDefaultModel();

    await defaultModel.updateElementsWithData([
    {
      [QC.LmvDbId]: stream.dbId,
      [QC.Settings]: settings

    }],
    'Offline timeout set');
  }

  increaseStreamRefreshSubscriber() {
    if (this.refreshSubscribers == 0) {
      this.refreshIntervalHandle = setInterval(() => {
        this.refreshStreamsLastReadings();
      }, this.refreshInterval);
    }
    this.refreshSubscribers++;
  }

  decreaseStreamRefreshSubscriber() {
    this.refreshSubscribers--;
    if (this.refreshSubscribers == 0) {
      clearInterval(this.refreshIntervalHandle);
    }
  }
}