-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathget_all_para.cc
More file actions
128 lines (106 loc) · 3.34 KB
/
get_all_para.cc
File metadata and controls
128 lines (106 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
如何汇总一个ParallelWork的结果
https://github.com/sogou/workflow/issues/140
*/
#include <workflow/WFTaskFactory.h>
#include <workflow/WFHttpServer.h>
#include <workflow/WFFacilities.h>
#include <signal.h>
#include <spdlog/spdlog.h>
#include <mutex>
using namespace protocol;
std::mutex mutex;
struct series_context
{
std::vector<HttpResponse> resp_list;
WFHttpTask *server_task;
};
void fetch_callback(WFHttpTask *fetch_task)
{
spdlog::info("fetch cb start");
SeriesWork *series_sub = series_of(fetch_task);
ParallelWork *pwork = static_cast<ParallelWork *>(series_sub->get_context());
SeriesWork *series = series_of(pwork);
series_context *context = static_cast<series_context *>(series->get_context());
HttpResponse *resp = fetch_task->get_resp();
spdlog::info("save resp in context");
std::lock_guard<std::mutex> lock(mutex);
context->resp_list.emplace_back(std::move(*resp));
}
void parallel_callback(const ParallelWork *pwork)
{
SeriesWork *series = series_of(pwork);
series_context *context = static_cast<series_context *>(series->get_context());
const void *body;
size_t size;
for (auto &resp : context->resp_list)
{
if (resp.get_parsed_body(&body, &size))
{
spdlog::info("resp size : {}", size);
// fwrite(body, 1, size, stdout); // for test
context->server_task->get_resp()->append_output_body_nocopy(body, size);
}
}
spdlog::info("All series in this parallel have done");
}
ParallelWork *create_fetch_paralell()
{
std::vector<std::string> urls =
{
"http://www.baidu.com",
"http://www.bing.com",
"http://www.sogo.com"};
ParallelWork *pwork = Workflow::create_parallel_work(parallel_callback);
for (auto &url : urls)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url,
4,
2,
fetch_callback);
SeriesWork *series = Workflow::create_series_work(task, nullptr);
series->set_context(pwork);
pwork->add_series(series);
}
return pwork;
}
void process(WFHttpTask *server_task)
{
HttpRequest *req = server_task->get_req();
if (strcmp(req->get_request_uri(), "/test") == 0)
{
ParallelWork *pwork = create_fetch_paralell();
SeriesWork *series = series_of(server_task);
series_context *context = new series_context;
context->server_task = server_task;
series->set_context(context);
series->push_back(pwork);
server_task->set_callback([&context](WFHttpTask *)
{
spdlog::info("delete context");
delete context;
});
return;
}
else
{
server_task->get_resp()->append_output_body("<html>server other response</html>");
return;
}
}
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo)
{
wait_group.done();
}
int main()
{
signal(SIGINT, sig_handler);
WFHttpServer server(process);
if (server.start(8888) == 0)
{
wait_group.wait();
server.stop();
}
return 0;
}