1+ /* *
2+ * \file src/plugins/output/forwarder/src/Connection.h
3+ * \author Michal Sedlak <[email protected] > 4+ * \brief Buffered socket connection
5+ * \date 2021
6+ */
7+
8+ /* Copyright (C) 2021 CESNET, z.s.p.o.
9+ *
10+ * Redistribution and use in source and binary forms, with or without
11+ * modification, are permitted provided that the following conditions
12+ * are met:
13+ * 1. Redistributions of source code must retain the above copyright
14+ * notice, this list of conditions and the following disclaimer.
15+ * 2. Redistributions in binary form must reproduce the above copyright
16+ * notice, this list of conditions and the following disclaimer in
17+ * the documentation and/or other materials provided with the
18+ * distribution.
19+ * 3. Neither the name of the Company nor the names of its contributors
20+ * may be used to endorse or promote products derived from this
21+ * software without specific prior written permission.
22+ *
23+ * ALTERNATIVELY, provided that this notice is retained in full, this
24+ * product may be distributed under the terms of the GNU General Public
25+ * License (GPL) version 2 or later, in which case the provisions
26+ * of the GPL apply INSTEAD OF those given above.
27+ *
28+ * This software is provided ``as is'', and any express or implied
29+ * warranties, including, but not limited to, the implied warranties of
30+ * merchantability and fitness for a particular purpose are disclaimed.
31+ * In no event shall the company or contributors be liable for any
32+ * direct, indirect, incidental, special, exemplary, or consequential
33+ * damages (including, but not limited to, procurement of substitute
34+ * goods or services; loss of use, data, or profits; or business
35+ * interruption) however caused and on any theory of liability, whether
36+ * in contract, strict liability, or tort (including negligence or
37+ * otherwise) arising in any way out of the use of this software, even
38+ * if advised of the possibility of such damage.
39+ *
40+ */
41+
42+ #pragma once
43+
44+ #include " ConnectionManager.h"
45+ #include " ConnectionParams.h"
46+ #include " ConnectionBuffer.h"
47+
48+ #include < sys/socket.h>
49+ #include < netinet/in.h>
50+ #include < arpa/inet.h>
51+ #include < unistd.h>
52+
53+ #include < atomic>
54+ #include < mutex>
55+ #include < cstdint>
56+
57+ static constexpr int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024 ;
58+
59+ class ConnectionManager ;
60+
61+ class Connection
62+ {
63+ friend class ConnectionManager ;
64+
65+ public:
66+ // / Flag indicating that the connection was lost and the forwarder needs to resend templates etc.
67+ // / The flag won't be reset when the connection is reestablished!
68+ std::atomic<bool > connection_lost_flag { false };
69+
70+ Connection (ConnectionManager &manager, ConnectionParams params, long buffer_size = DEFAULT_BUFFER_SIZE);
71+
72+ bool
73+ connect ();
74+
75+ std::unique_lock<std::mutex>
76+ begin_write ();
77+
78+ bool
79+ write (void *data, long length);
80+
81+ bool
82+ send_some ();
83+
84+ void
85+ commit_write ();
86+
87+ void
88+ rollback_write ();
89+
90+ long
91+ writeable ();
92+
93+ void
94+ close ();
95+
96+ ~Connection ();
97+
98+ private:
99+ // / The manager managing this connection
100+ ConnectionManager &manager;
101+
102+ // / The parameters to estabilish the connection
103+ ConnectionParams params;
104+
105+ // / The connection socket
106+ int sockfd = -1 ;
107+
108+ // / Buffer for the data to send and a mutex guarding it
109+ // / (buffer will be accessed from sender thread and writer thread)
110+ std::mutex buffer_mutex;
111+ ConnectionBuffer buffer;
112+
113+ // / Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time
114+ // / (doesn't need to be atomic because we only set it while holding the mutex)
115+ bool has_data_to_send = false ;
116+
117+ // / Flag indicating that the connection has been closed and can be disposed of after the data is sent
118+ std::atomic<bool > close_flag { false };
119+ };
0 commit comments