Skip to content
This repository was archived by the owner on Dec 20, 2023. It is now read-only.

Commit f32715e

Browse files
authored
Merge pull request #620 from erjiaqing/wdm_events
[WDM] Add GetEvents() in WeaveDataManagementClient
2 parents cb24fe7 + 289c76f commit f32715e

23 files changed

+934
-59
lines changed

src/device-manager/WeaveDataManagementClient.cpp

Lines changed: 378 additions & 32 deletions
Large diffs are not rendered by default.

src/device-manager/WeaveDataManagementClient.h

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@
3636
#include <SystemLayer/SystemPacketBuffer.h>
3737
#include <Weave/Profiles/data-management/SubscriptionClient.h>
3838
#include <Weave/Profiles/data-management/Current/GenericTraitCatalogImpl.h>
39+
#include <Weave/Profiles/data-management/Current/EventProcessor.h>
3940
#include <map>
41+
#include <memory>
4042
#include <vector>
4143
#include "WeaveDeviceManager.h"
4244

45+
#include <chrono>
46+
4347
namespace nl {
4448
namespace Weave {
4549
namespace DeviceManager {
@@ -70,6 +74,7 @@ class NL_DLL_EXPORT BytesData
7074

7175
class GenericTraitUpdatableDataSink;
7276
class WdmClient;
77+
class WdmEventProcessor;
7378

7479
class WdmClientFlushUpdateStatus
7580
{
@@ -167,6 +172,7 @@ class NL_DLL_EXPORT GenericTraitUpdatableDataSink : public nl::Weave::Profiles::
167172
class NL_DLL_EXPORT WdmClient
168173
{
169174
friend class GenericTraitUpdatableDataSink;
175+
friend class WdmEventProcessor;
170176

171177
public:
172178
enum
@@ -189,7 +195,10 @@ class NL_DLL_EXPORT WdmClient
189195
WEAVE_ERROR FlushUpdate(void * apAppReqState, DMFlushUpdateCompleteFunct onComplete, DMErrorFunct onError);
190196

191197
WEAVE_ERROR RefreshData(void * apAppReqState, DMCompleteFunct onComplete, DMErrorFunct onError,
192-
GetDataHandleFunct getDataHandleCb);
198+
GetDataHandleFunct getDataHandleCb, bool aFetchEvents = false);
199+
200+
WEAVE_ERROR GetEvents(BytesData * aBytes);
201+
void SetEventFetchingTimeout(uint32_t aTimeoutSec);
193202

194203
void * mpAppState;
195204

@@ -216,9 +225,8 @@ class NL_DLL_EXPORT WdmClient
216225
static void ClientEventCallback(void * const aAppState, SubscriptionClient::EventID aEvent,
217226
const SubscriptionClient::InEventParam & aInParam,
218227
SubscriptionClient::OutEventParam & aOutParam);
219-
220228
WEAVE_ERROR RefreshData(void * apAppReqState, void * apContext, DMCompleteFunct onComplete, DMErrorFunct onError,
221-
GetDataHandleFunct getDataHandleCb);
229+
GetDataHandleFunct getDataHandleCb, bool aWithEvents);
222230
WEAVE_ERROR GetDataSink(const ResourceIdentifier & aResourceId, uint32_t aProfileId, uint64_t aInstanceId,
223231
GenericTraitUpdatableDataSink *& apGenericTraitUpdatableDataSink);
224232
WEAVE_ERROR SubscribePublisherTrait(const ResourceIdentifier & aResourceId, const uint64_t & aInstanceId,
@@ -230,6 +238,10 @@ class NL_DLL_EXPORT WdmClient
230238
PropertyPathHandle mPropertyPathHandle, uint32_t aReason, uint32_t aStatusProfileId,
231239
uint16_t aStatusCode);
232240

241+
WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, const EventProcessor::EventHeader & inEventHeader);
242+
243+
WEAVE_ERROR PrepareLastObservedEventList(uint32_t & aEventListLen);
244+
233245
GenericTraitSinkCatalog mSinkCatalog;
234246
TraitPath * mpPublisherPathList;
235247

@@ -240,7 +252,39 @@ class NL_DLL_EXPORT WdmClient
240252
OpState mOpState;
241253
std::vector<std::string> mFailedPaths;
242254
std::vector<WdmClientFlushUpdateStatus> mFailedFlushPathStatus;
255+
256+
std::unique_ptr<WdmEventProcessor> mpWdmEventProcessor;
257+
std::string mEventStrBuffer;
258+
259+
// Three flags for event fetching
260+
// If EventFetching is enabled
261+
// If we limit the event fetching time
262+
// If event fetching time limit exceeded (TLE)
263+
bool mEnableEventFetching, mLimitEventFetchTimeout, mEventFetchingTLE;
264+
265+
std::chrono::time_point<std::chrono::system_clock> mEventFetchTimeout;
266+
std::chrono::seconds mEventFetchTimeLimit;
267+
SubscriptionClient::LastObservedEvent mLastObservedEventByImportance[kImportanceType_Last - kImportanceType_First + 1];
268+
SubscriptionClient::LastObservedEvent
269+
mLastObservedEventByImportanceForSending[kImportanceType_Last - kImportanceType_First + 1];
243270
};
271+
272+
class WdmEventProcessor : public EventProcessor
273+
{
274+
public:
275+
WdmEventProcessor(uint64_t aNodeId, WdmClient * aWdmClient);
276+
virtual ~WdmEventProcessor() = default;
277+
278+
protected:
279+
WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, nl::Weave::Profiles::DataManagement::SubscriptionClient & inClient,
280+
const EventHeader & inEventHeader) override;
281+
282+
WEAVE_ERROR GapDetected(const EventHeader & inEventHeader) override;
283+
284+
private:
285+
WdmClient * mWdmClient;
286+
};
287+
244288
} // namespace DeviceManager
245289
} // namespace Weave
246290
} // namespace nl

src/device-manager/cocoa/NLWdmClient.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef void (^WdmClientFailureBlock)(id owner, NSError * error);
5959
*/
6060
- (void)setNodeId:(uint64_t)nodeId;
6161

62+
6263
/**
6364
* Create the new data newDataSink
6465
*
@@ -89,4 +90,38 @@ typedef void (^WdmClientFailureBlock)(id owner, NSError * error);
8990
*/
9091
- (void)refreshData:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler;
9192

93+
/**
94+
* Begins a sync of all events. The result of this operation can be observed through the CompletionHandler and
95+
* failureHandler, the procedure will be terminated after reaching timeout.
96+
*/
97+
- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler
98+
timeoutSec:(uint32_t)timeoutSec;
99+
100+
/**
101+
* getEvents will return a list of event data in json array representation.
102+
* If no events have been fetched, an empty array ("[]") will be returned.
103+
* The internal buffer will be cleared when beginFetchEvents() is called.
104+
* Each element in this array should be an object
105+
*
106+
* Field | Type | Description
107+
* -----------------------+-------------+--------------
108+
* Source | uint64 | Event Header
109+
* Importance | int (enum)
110+
* Id | uint64
111+
* RelatedImportance | uint64
112+
* RelatedId | uint64
113+
* UTCTimestamp | uint64
114+
* ResourceId | uint64
115+
* TraitProfileId | uint64
116+
* TraitInstanceId | uint64
117+
* Type | uint64
118+
* DeltaUTCTime | int
119+
* DeltaSystemTime | int
120+
* PresenceMask | uint64
121+
* DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
122+
* Data | Object | Event Trait Data
123+
*/
124+
- (void)getEvents:(NSString **)events;
125+
126+
92127
@end

src/device-manager/cocoa/NLWdmClient.mm

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,5 +547,61 @@ - (void)refreshData:(WdmClientCompletionBlock)completionHandler failure:(WdmClie
547547
});
548548
}
549549

550+
- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler timeoutSec:(uint32_t)timeoutSec
551+
{
552+
WDM_LOG_METHOD_SIG();
553+
554+
NSString * taskName = @"FetchEvents";
555+
556+
dispatch_sync(_mWeaveWorkQueue, ^() {
557+
_mWeaveCppWdmClient->SetEventFetchingTimeout(timeoutSec);
558+
});
559+
560+
// we use async for the results are sent back to caller via async means also
561+
dispatch_async(_mWeaveWorkQueue, ^() {
562+
if (nil == _mRequestName) {
563+
_mRequestName = taskName;
564+
_mCompletionHandler = [completionHandler copy];
565+
_mFailureHandler = [failureHandler copy];
566+
567+
WEAVE_ERROR err = _mWeaveCppWdmClient->RefreshData((__bridge void *) self, onWdmClientComplete, onWdmClientError, NULL, true);
568+
569+
if (WEAVE_NO_ERROR != err) {
570+
[self dispatchAsyncDefaultFailureBlockWithCode:err];
571+
}
572+
} else {
573+
WDM_LOG_ERROR(@"%@: Attemp to %@ while we're still executing %@, ignore", _name, taskName, _mRequestName);
574+
575+
// do not change _mRequestName, as we're rejecting this request
576+
[self dispatchAsyncFailureBlock:WEAVE_ERROR_INCORRECT_STATE taskName:taskName handler:failureHandler];
577+
}
578+
});
579+
}
580+
581+
- (WEAVE_ERROR)getEvents:(NSString **)events
582+
{
583+
__block WEAVE_ERROR err = WEAVE_NO_ERROR;
584+
__block nl::Weave::DeviceManager::BytesData bytesData;
585+
586+
WDM_LOG_METHOD_SIG();
587+
588+
VerifyOrExit(NULL != _mWeaveCppWdmClient, err = WEAVE_ERROR_INCORRECT_STATE);
589+
590+
// need this bracket to use Verify macros
591+
{
592+
// we use sync so the bytesData is immediately available to the caller upon return
593+
dispatch_sync(_mWeaveWorkQueue, ^() {
594+
err = _mWeaveCppWdmClient->GetEvents(&bytesData);
595+
});
596+
}
597+
598+
exit:
599+
if (WEAVE_NO_ERROR == err)
600+
{
601+
*events = [[NSString alloc] initWithBytes:bytesData.mpDataBuf length:bytesData.mDataLen encoding:NSUTF8StringEncoding];
602+
}
603+
return err;
604+
}
605+
550606
@end
551607
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_CLIENT_EXPERIMENTAL

src/device-manager/java/WeaveDeviceManager-JNI.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ extern "C" {
184184
NL_DLL_EXPORT jlong Java_nl_Weave_DataManagement_WdmClientImpl_newDataSink(JNIEnv *env, jobject self, jlong wdmClientPtr, jobject resourceIdentifierObj, jlong profileId, jlong instanceId, jstring path);
185185
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFlushUpdate(JNIEnv *env, jobject self, jlong wdmClientPtr);
186186
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginRefreshData(JNIEnv *env, jobject self, jlong wdmClientPtr);
187+
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec);
188+
NL_DLL_EXPORT jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr);
187189
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
188190
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_shutdown(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
189191
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_clear(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
@@ -3995,6 +3997,47 @@ void Java_nl_Weave_DataManagement_WdmClientImpl_beginRefreshData(JNIEnv *env, jo
39953997
}
39963998
}
39973999

4000+
void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec)
4001+
{
4002+
WEAVE_ERROR err = WEAVE_NO_ERROR;
4003+
WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
4004+
4005+
WeaveLogProgress(DeviceManager, "beginFetchEvents() called");
4006+
4007+
pthread_mutex_lock(&sStackLock);
4008+
wdmClient->SetEventFetchingTimeout((uint32_t)timeoutSec);
4009+
err = wdmClient->RefreshData((void *)"FetchEvents", HandleWdmClientComplete, HandleWdmClientError, NULL, true);
4010+
pthread_mutex_unlock(&sStackLock);
4011+
4012+
if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
4013+
{
4014+
ThrowError(env, err);
4015+
}
4016+
}
4017+
4018+
4019+
jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr)
4020+
{
4021+
WEAVE_ERROR err = WEAVE_NO_ERROR;
4022+
WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
4023+
BytesData bytesData;
4024+
4025+
WeaveLogProgress(DeviceManager, "getEvents() called");
4026+
4027+
err = wdmClient->GetEvents(&bytesData);
4028+
SuccessOrExit(err);
4029+
4030+
exit:
4031+
4032+
if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
4033+
{
4034+
ThrowError(env, err);
4035+
}
4036+
4037+
// bytesData.mpDataBuf should be a pointer get by mEventBuffer.c_str()
4038+
return env->NewStringUTF(reinterpret_cast<const char *>(bytesData.mpDataBuf));
4039+
}
4040+
39984041
void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr)
39994042
{
40004043
GenericTraitUpdatableDataSink *pDataSink = NULL;

src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,51 @@ public interface WdmClient
6767
*/
6868
public void beginRefreshData();
6969

70+
/**
71+
* The result of this operation can be observed through the {@link CompletionHandler}
72+
* that has been assigned via {@link #setCompletionHandler}.
73+
* The fetched events can be accessed via getEvents()
74+
*
75+
* @param timeoutSec operation timeoutSec, this operation might take a long time if the number of events is too large.
76+
*/
77+
public void beginFetchEvents(int timeoutSec);
78+
7079
public CompletionHandler getCompletionHandler();
7180
public void setCompletionHandler(CompletionHandler compHandler);
7281

7382
public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr);
7483

84+
/**
85+
* getEvents will return a list of event data in json array representation.
86+
* If no events have been fetched, an empty array ("[]") will be returned.
87+
* The internal buffer will be cleared when beginFetchEvents() is called.
88+
* Each element in this array should be an object
89+
*
90+
* Field | Type | Description
91+
* -----------------------+-------------+--------------
92+
* Source | uint64 | Event Header
93+
* Importance | int (enum)
94+
* Id | uint64
95+
* RelatedImportance | uint64
96+
* RelatedId | uint64
97+
* UTCTimestamp | uint64
98+
* ResourceId | uint64
99+
* TraitProfileId | uint64
100+
* TraitInstanceId | uint64
101+
* Type | uint64
102+
* DeltaUTCTime | int
103+
* DeltaSystemTime | int
104+
* PresenceMask | uint64
105+
* DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
106+
* Data | Object | Event Trait Data
107+
*/
108+
public String getEvents();
109+
75110
public interface CompletionHandler
76111
{
77112
void onFlushUpdateComplete(Throwable[] exceptions, WdmClient wdmClient);
78113
void onRefreshDataComplete();
114+
void onFetchEventsComplete();
79115
void onError(Throwable err);
80116
}
81117
};

src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ public void beginRefreshData()
123123
beginRefreshData(mWdmClientPtr);
124124
}
125125

126+
@Override
127+
public void beginFetchEvents(int timeoutSec)
128+
{
129+
ensureNotClosed();
130+
beginFetchEvents(mWdmClientPtr, timeoutSec);
131+
}
132+
126133
@Override
127134
public CompletionHandler getCompletionHandler()
128135
{
@@ -135,6 +142,13 @@ public void setCompletionHandler(CompletionHandler compHandler)
135142
mCompHandler = compHandler;
136143
}
137144

145+
@Override
146+
public String getEvents()
147+
{
148+
ensureNotClosed();
149+
return getEvents(mWdmClientPtr);
150+
}
151+
138152
public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr)
139153
{
140154
GenericTraitUpdatableDataSink trait = null;
@@ -179,6 +193,11 @@ private void onRefreshDataComplete()
179193
requireCompletionHandler().onRefreshDataComplete();
180194
}
181195

196+
private void onFetchEventsComplete()
197+
{
198+
requireCompletionHandler().onFetchEventsComplete();
199+
}
200+
182201
private void ensureNotClosed() {
183202
if (mWdmClientPtr == 0) {
184203
throw new IllegalStateException("This WdmClient has already been closed.");
@@ -205,4 +224,6 @@ private CompletionHandler requireCompletionHandler() {
205224
private native long newDataSink(long wdmClientPtr, ResourceIdentifier resourceIdentifier, long profileId, long instanceId, String path);
206225
private native void beginFlushUpdate(long wdmClientPtr);
207226
private native void beginRefreshData(long wdmClientPtr);
227+
private native void beginFetchEvents(long wdmClientPtr, int timeoutSec);
228+
private native String getEvents(long wdmClientPtr);
208229
};

0 commit comments

Comments
 (0)