|
1 | 1 | import { Injectable, inject, OnDestroy } from '@angular/core'; |
2 | | -import { Subscription, Observable, ReplaySubject, MonoTypeOperatorFunction, interval, withLatestFrom, concat, skip, from } from 'rxjs'; |
| 2 | +import { Subscription, Observable, ReplaySubject, withLatestFrom, concat, skip, from, filter, merge, shareReplay, take, timer } from 'rxjs'; |
3 | 3 | import { AppSettingsService } from './app-settings.service'; |
4 | 4 | import { DataService, IPathUpdate } from './data.service'; |
5 | 5 | import { UUID } from '../utils/uuid.util' |
@@ -257,33 +257,41 @@ export class DatasetService implements OnDestroy { |
257 | 257 | this.setupServiceSubjectRegistry(newDataSourceConfig.uuid, newDataSourceConfig.maxDataPoints); |
258 | 258 | const dataSource = this._svcDataSource[this._svcDataSource.push(newDataSourceConfig) - 1]; |
259 | 259 |
|
260 | | - console.log(`[Dataset Service] Starting recording process: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Period: ${configuration.period}, Datapoints: ${newDataSourceConfig.maxDataPoints}`); |
261 | | - |
262 | | - // Emit at a regular interval using the last value. We use this and not sampleTime() to make sure that if there is no new data, we still send the last know value. This is to prevent dataset blanks that look ugly on the chart |
263 | | - function sampleInterval<IPathData>(period: number): MonoTypeOperatorFunction<IPathData> { |
264 | | - return (source) => interval(period).pipe(withLatestFrom(source, (_, value) => value)); |
265 | | - }; |
| 260 | + console.log( |
| 261 | + `[Dataset Service] Starting recording process: ${configuration.path}, Scale: ${configuration.timeScaleFormat}, Period: ${configuration.period}, Datapoints: ${newDataSourceConfig.maxDataPoints}` |
| 262 | + ); |
266 | 263 |
|
267 | 264 | // Decide how to interpret the dataset values (scalar vs radian domains) |
268 | 265 | const angleDomain = this.resolveAngleDomain(configuration.path, configuration.baseUnit); |
269 | 266 |
|
270 | | - // Subscribe to path data, update historicalData/stats and sends new values to Observers |
271 | | - dataSource.pathObserverSubscription = this.data.subscribePath(configuration.path, configuration.pathSource).pipe(sampleInterval(newDataSourceConfig.sampleTime)).subscribe( |
272 | | - (newValue: IPathUpdate) => { |
273 | | - if (newValue.data.value === null) return; // we don't need null values |
274 | | - |
275 | | - // Keep the array to specified size before adding new value |
276 | | - if (dataSource.maxDataPoints > 0 && dataSource.historicalData.length >= dataSource.maxDataPoints) { |
277 | | - dataSource.historicalData.shift(); |
278 | | - } |
279 | | - dataSource.historicalData.push(newValue.data.value); |
280 | | - |
281 | | - // Add new datapoint to historicalData |
282 | | - const datapoint: IDatasetServiceDatapoint = this.updateDataset(dataSource, configuration.baseUnit, angleDomain); |
283 | | - // Copy object new datapoint so it's not send by reference, then push to Subject so that Observers can receive |
284 | | - this._svcSubjectObserverRegistry.find(registration => registration.datasetUuid === dataSource.uuid).rxjsSubject.next(datapoint); |
285 | | - } |
| 267 | + // Share the latest non-null value so we can: |
| 268 | + // 1) emit immediately on first value (chart isn't blank) |
| 269 | + // 2) then emit periodically using the latest known value |
| 270 | + const path$ = this.data.subscribePath(configuration.path, configuration.pathSource).pipe( |
| 271 | + filter((u: IPathUpdate) => u?.data?.value !== null), |
| 272 | + shareReplay({ bufferSize: 1, refCount: true }) |
| 273 | + ); |
| 274 | + |
| 275 | + const firstValue$ = path$.pipe(take(1)); |
| 276 | + |
| 277 | + const sampled$ = timer(newDataSourceConfig.sampleTime, newDataSourceConfig.sampleTime).pipe( |
| 278 | + withLatestFrom(path$, (_tick, value) => value) |
286 | 279 | ); |
| 280 | + |
| 281 | + // Subscribe to path data, update historicalData/stats and send new values to Observers |
| 282 | + dataSource.pathObserverSubscription = merge(firstValue$, sampled$).subscribe((newValue: IPathUpdate) => { |
| 283 | + // Keep the array to specified size before adding new value |
| 284 | + if (dataSource.maxDataPoints > 0 && dataSource.historicalData.length >= dataSource.maxDataPoints) { |
| 285 | + dataSource.historicalData.shift(); |
| 286 | + } |
| 287 | + dataSource.historicalData.push(newValue.data.value); |
| 288 | + |
| 289 | + const datapoint: IDatasetServiceDatapoint = this.updateDataset(dataSource, configuration.baseUnit, angleDomain); |
| 290 | + |
| 291 | + this._svcSubjectObserverRegistry |
| 292 | + .find(registration => registration.datasetUuid === dataSource.uuid) |
| 293 | + .rxjsSubject.next(datapoint); |
| 294 | + }); |
287 | 295 | } |
288 | 296 |
|
289 | 297 | /** |
|
0 commit comments