-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstats_server_ravelled
More file actions
executable file
·146 lines (129 loc) · 3.73 KB
/
stats_server_ravelled
File metadata and controls
executable file
·146 lines (129 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/python3
#+
# DBussy/Ravel example -- a server which performs various useless
# statistical calculations on numbers which are passed to it. The
# interface class defined here shows different ways of getting arguments
# out of the request message and returning results.
#
# Try the accompanying stats_client_xxx script after starting this
# running and observe the results.
#
# Copyright 2017 by Lawrence D'Oliveiro <ldo@geek-central.gen.nz>. This
# script is licensed CC0
# <https://creativecommons.org/publicdomain/zero/1.0/>; do with it
# what you will.
#-
import signal
import random
import statistics
import asyncio
import dbussy
import ravel
stats_bus_name = "com.example.stats"
stats_path_name = "/com/example/stats"
stats_iface_name = "com.example.stats"
loop = dbussy.get_event_loop()
bus = ravel.session_bus()
bus.attach_asyncio(loop)
signal.signal(signal.SIGINT, signal.SIG_DFL) # abort without raising KeyboardInterrupt on CTRL/C
#+
# Interface
#-
@ravel.interface(ravel.INTERFACE.SERVER, name = stats_iface_name)
class Stats :
@ravel.method \
(
name = "mean_and_deviation",
in_signature = "ad",
out_signature = "ddd",
arg_keys = ["numbers"],
result_keyword = "results",
result_attrs = ["mean", "stdev", "pstdev"],
)
def mean_and_deviation(self, numbers, results) :
try :
results.mean = statistics.mean(numbers)
results.stdev = statistics.stdev(numbers)
results.pstdev = statistics.pstdev(numbers)
except statistics.StatisticsError as err :
raise ravel.ErrorReturn(ravel.DBUS.ERROR_INVALID_ARGS, "StatisticsError: %s" % err.args[0])
#end try
#end mean_and_deviation
if hasattr(statistics, "harmonic_mean") :
# Python 3.6 or later
@ravel.method \
(
name = "harmonic_mean",
in_signature = "ad",
args_keyword = "args",
out_signature = "d",
set_result_keyword = "set_result",
)
def harmonic_mean(self, args, set_result) :
numbers = args[0]
result = statistics.harmonic_mean(numbers)
set_result([result])
#end harmonic_mean
#end if
@ravel.method \
(
name = "hi_lo",
in_signature = "ad",
out_signature = "ddd",
result_keys = ["median", "max", "min"],
)
def hi_lo(self, numbers) :
return \
{
"median" : statistics.median(numbers),
"max" : max(numbers),
"min" : min(numbers),
}
#end hi_lo
@ravel.method \
(
name = "pick_one",
in_signature = "ss",
out_signature = "s",
arg_keys = ["heads", "tails"],
result_keyword = "result",
)
def pick_one(self, heads, tails, result) :
result[0] = (heads, tails)[random.randrange(2)]
#end pick_one
@ravel.signal \
(
name = "something_happened",
in_signature = "s",
arg_keys = ["msg"],
)
def something_happened(self, msg) : pass
@ravel.method \
(
name = "dont_call_me",
in_signature = "",
out_signature = "",
reply = False,
deprecated = True
)
def dont_call_me(self) :
bus.send_signal \
(
path = stats_path_name,
interface = stats_iface_name,
name = "something_happened",
args = ["deprecated method “dont_call_me” called"]
)
#end dont_call_me
#end Stats
#+
# Mainline
#-
bus.request_name(bus_name = stats_bus_name, flags = ravel.DBUS.NAME_FLAG_DO_NOT_QUEUE)
bus.register \
(
path = "/",
fallback = True,
interface = Stats()
)
loop.run_forever()