1
+ import {
2
+ PollingDriftClientAccountSubscriber
3
+ } from './pollingDriftClientAccountSubscriber' ;
4
+
5
+ import {
6
+ OraclePriceData ,
7
+ OracleInfo
8
+ } from '../oracles/types' ;
9
+
10
+ import { getOracleId } from '../oracles/oracleId' ;
11
+
12
+ export class ExternalOracleDataDriftClientSubscriber extends PollingDriftClientAccountSubscriber {
13
+ private oracleLastUpdate = new Map < string , number > ( ) ;
14
+ private pollingOracles = new Map < string , boolean > ( ) ;
15
+ private oraclePollIntervalId : NodeJS . Timeout ;
16
+
17
+ constructor ( ...args : ConstructorParameters < typeof PollingDriftClientAccountSubscriber > ) {
18
+ super ( ...args ) ;
19
+
20
+ }
21
+
22
+ /** Override to prevent oracles from being automatically polled later */
23
+ public override updateOraclesToPoll ( ) : boolean {
24
+ return true ;
25
+ }
26
+
27
+ /** Public method to be called externally with fresh oracle data */
28
+ public feedOracle ( oracleInfo : OracleInfo , priceData : OraclePriceData , slot : number ) {
29
+ console . log ( 'ORACLEDATA feedOracle' ) ;
30
+ const oracleId = getOracleId ( oracleInfo . publicKey , oracleInfo . source ) ;
31
+ this . oracles . set ( oracleId , { data : priceData , slot } ) ;
32
+ this . oracleLastUpdate . set ( oracleId , Date . now ( ) ) ;
33
+
34
+ if ( this . pollingOracles . has ( oracleId ) ) {
35
+ const oracleToPoll = this . oraclesToPoll . get ( oracleId ) ;
36
+ if ( oracleToPoll ) {
37
+ this . accountLoader . removeAccount (
38
+ oracleToPoll . publicKey ,
39
+ oracleToPoll . callbackId
40
+ ) ;
41
+ this . pollingOracles . delete ( oracleId ) ;
42
+ }
43
+ }
44
+ }
45
+
46
+ public override async subscribe ( ) : Promise < boolean > {
47
+ await super . subscribe ( ) ;
48
+ this . removeAllOraclesFromAccountLoader ( ) ;
49
+ this . startOraclePollingWatchdog ( ) ;
50
+ return true ;
51
+ }
52
+
53
+ private startOraclePollingWatchdog ( ) {
54
+ this . oraclePollIntervalId = setInterval ( async ( ) => {
55
+ for ( const [ oracleId , oracleToPoll ] of this . oraclesToPoll . entries ( ) ) {
56
+ const lastUpdate = this . oracleLastUpdate . get ( oracleId ) || 0 ;
57
+ const now = Date . now ( ) ;
58
+ if ( now - lastUpdate > 70_000 && ! this . pollingOracles . has ( oracleId ) ) {
59
+ console . log ( 'ORACLEDATA polling oracle via RPC' , oracleToPoll . publicKey . toBase58 ( ) ) ;
60
+ await this . addOracleToAccountLoader ( oracleToPoll ) ;
61
+ this . pollingOracles . set ( oracleId , true ) ;
62
+ }
63
+ }
64
+ } , 70_000 ) ;
65
+ }
66
+
67
+ public removeAllOraclesFromAccountLoader ( ) {
68
+ for ( const oracleInfo of this . oracleInfos ) {
69
+ const existingAccountToLoad = this . accountLoader . accountsToLoad . get ( oracleInfo . publicKey . toString ( ) ) ;
70
+ if ( existingAccountToLoad ) {
71
+ console . log ( 'ORACLEDATA remove from account loader' , oracleInfo . publicKey . toBase58 ( ) ) ;
72
+ for ( const [ callbackId ] of existingAccountToLoad . callbacks ) {
73
+ this . accountLoader . removeAccount ( oracleInfo . publicKey , callbackId ) ;
74
+ }
75
+ }
76
+ }
77
+ }
78
+
79
+ public override async unsubscribe ( ) : Promise < void > {
80
+ clearInterval ( this . oraclePollIntervalId ) ;
81
+ await super . unsubscribe ( ) ;
82
+ this . oracleLastUpdate . clear ( ) ;
83
+ this . pollingOracles . clear ( ) ;
84
+ }
85
+ }
0 commit comments