VSM C++ SDK
Vehicle Specific Modules SDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
socket_processor.h
Go to the documentation of this file.
1 // Copyright (c) 2018, Smart Projects Holdings Ltd
2 // All rights reserved.
3 // See LICENSE file for license details.
4 
10 #ifndef _UGCS_VSM_SOCKET_PROCESSOR_H_
11 #define _UGCS_VSM_SOCKET_PROCESSOR_H_
12 
13 #include <ugcs/vsm/io_request.h>
15 #include <ugcs/vsm/singleton.h>
16 #include <ugcs/vsm/socket_address.h>
17 
18 #include <thread>
19 #include <unordered_map>
20 #include <vector>
21 
22 namespace ugcs {
23 namespace vsm {
24 
26 public:
27  Local_interface(const std::string& name);
28 public:
29  std::string name;
30  bool is_multicast = false;
31  bool is_loopback = false;
32  std::vector<Socket_address::Ptr> adresses;
33 };
34 
37 {
39 public:
46 
47  virtual
49 
51  static Ptr
52  Create();
53 
55  template <typename... Args>
56  static Ptr
57  Get_instance(Args &&... args)
58  {
59  return singleton.Get_instance(std::forward<Args>(args)...);
60  }
61 
63  class Stream: public Io_stream
64  {
66  public:
67  template<typename... Args>
68  Stream(Socket_processor::Ptr processor, Args&& ...args):
69  Io_stream(std::forward<Args>(args)...),
70  processor(processor)
71  {}
72 
73  typedef Reference_guard<Stream::Ptr> Ref;
74 
75  ~Stream();
76 
77  void
78  Set_state(Io_stream::State state);
79 
80  void
81  Set_connect_request(Io_request::Ptr request);
82 
83  void
84  Close_socket();
85 
87  Get_connect_request();
88 
89  void
90  Abort_pending_requests(Io_result result = Io_result::CLOSED);
91 
94 
99  virtual Operation_waiter
100  Read_from(
101  size_t max_to_read,
102  Read_from_handler completion_handler,
104 
107  virtual Operation_waiter
108  Write_to(
110  Socket_address::Ptr dest_addr,
111  Write_handler completion_handler,
113 
114  /* Returns a newly created Socket_address::Ptr with peer address. */
115  Socket_address::Ptr
116  Get_peer_address();
117 
118  /* Set peer address for udp stream
119  * All subsequent Write() calls will go to this address.*/
120  void
121  Set_peer_address(Socket_address::Ptr);
122 
123  /* Returns a newly created Socket_address::Ptr with local address. */
124  Socket_address::Ptr
125  Get_local_address();
126 
127  bool
128  Add_multicast_group(Socket_address::Ptr interface, Socket_address::Ptr multicast);
129 
130  bool
131  Remove_multicast_group(Socket_address::Ptr interface, Socket_address::Ptr multicast);
132 
133  /* Enable/disable sending of broadcast packets.
134  * Valid for SOCK_DGRAM streams only
135  * @param enable true - enable sending of broadcast packets
136  * false - disable sending of broadcast packets
137  * @return true on success */
138  bool
139  Enable_broadcast(bool enable);
140 
141  typedef std::unique_ptr<std::vector<uint8_t>> Buf_ptr;
142 
143  private:
144  template<typename T>
145  class Circular_buffer {
146  public:
147  bool
148  Push(T&& item) {
149  bool ret = true;
150  if (writer == reader) {
151  if (is_empty) {
152  if (buffer.size() < MAX_CACHED_COUNT) {
153  buffer.resize(MAX_CACHED_COUNT);
154  }
155  is_empty = false;
156  } else {
157  // buffer full.
158  reader++;
159  if (reader == buffer.size()) {
160  reader = 0;
161  }
162  ret = true;
163  }
164  }
165  buffer[writer] = std::move(item);
166  writer++;
167  if (writer == buffer.size()) {
168  writer = 0;
169  }
170  return ret;
171  }
172 
173  bool
174  Pull(T& ret) { // NOLINT(runtime/references)
175  if (is_empty) {
176  return false;
177  }
178  ret = std::move(buffer[reader]);
179  reader++;
180  if (reader == buffer.size()) {
181  reader = 0;
182  }
183  if (reader == writer) {
184  is_empty = true;
185  }
186  return true;
187  }
188  bool
189  Is_empty()
190  {
191  return is_empty;
192  }
193  void
194  Clear()
195  {
196  writer = 0;
197  reader = 0;
198  is_empty = true;
199  buffer.clear();
200  }
201 
202  private:
203  size_t writer = 0;
204  size_t reader = 0;
205  std::vector<T> buffer;
206  bool is_empty = true;
207  };
208 
209  Socket_address::Ptr peer_address = nullptr;
210 
211  Socket_address::Ptr local_address = nullptr;
212 
213  sockets::Socket_handle s = INVALID_SOCKET;
214 
215  Socket_processor::Ptr processor;
216 
217  Io_request::Ptr connect_request;
218 
219  // true if socket was connected using connect() call.
220  // I.e. no need to specify destination when doing send().
221  // Applies to both SOCK_DGRAM and SOCK_STREAM
222  bool is_connected = false;
223 
224  typedef std::pair<Write_request::Ptr, Socket_address::Ptr> Write_requests_entry;
225  std::list<Write_requests_entry> write_requests;
226 
227  typedef std::pair<Read_request::Ptr, Socket_address::Ptr> Read_requests_entry;
228  std::list<Read_requests_entry> read_requests;
229 
230  std::list<Io_request::Ptr> accept_requests;
231 
232  Buf_ptr reading_buffer;
233  size_t read_bytes = 0; // bytes read by current read request
234  size_t written_bytes = 0; // bytes written by current write request
235 
236  // UDP multi-stream specific stuff.
237  typedef std::pair<Buf_ptr, Socket_address::Ptr> Cache_entry;
238  // Accepted UDP streams for this stream/socket.
239  std::unordered_map<Socket_address::Ptr, Stream::Ptr> substreams;
240  // If present then this is a substream of another stream.
241  Stream::Ptr parent_stream = nullptr;
242  // Packet cache. Keeps unread packets until Read called.
243  Circular_buffer<Cache_entry> packet_cache;
244  // maximum packet count the stream will cache.
245  // When cache is full packets will be dropped.
246  static constexpr size_t MAX_CACHED_COUNT = 50;
247 
248  friend class Socket_processor;
249 
250  sockets::Socket_handle
251  Get_socket();
252 
253  void
254  Set_socket(sockets::Socket_handle s);
255 
257  void
258  Update_name();
259 
261  virtual Operation_waiter
262  Write_impl(Io_buffer::Ptr buffer,
263  Offset offset,
264  Write_handler completion_handler,
266 
268  Operation_waiter
269  Read_impl(size_t max_to_read, size_t min_to_read, Offset offset,
270  Read_handler completion_handler,
271  Request_completion_context::Ptr comp_ctx) override;
272 
274  Operation_waiter
275  Close_impl(Close_handler completion_handler,
276  Request_completion_context::Ptr comp_ctx) override;
277 
278  void
279  Process_udp_read_requests();
280  };
281 
284 
287  typedef Callback_proxy<
288  void, // return void
289  std::string, // host name as passed to Get_addr_info() call
290  std::string, // service as passed to Get_addr_info() call
291  std::list<addrinfo>, // list of addrinfo returned from Get_addr_info()
292  Io_result // result of operation: OK|TIMED_OUT|BAD_ADDRESS|CANCELLED
294 
297  typedef Callback_proxy<
298  void, // return void
299  Stream::Ref, // stream
300  Io_result // result of operation: OK|TIMED_OUT|BAD_ADDRESS|CANCELLED
302 
306 
308  Connect(std::string host, std::string service,
309  Connect_handler completion_handler,
311  Io_stream::Type sock_type = Io_stream::Type::TCP,
312  Socket_address::Ptr src_addr = nullptr)
313  {
314  return Connect(
315  Socket_address::Create(host, service),
316  completion_handler,
317  completion_context,
318  sock_type,
319  src_addr);
320  }
321 
323  Connect(Socket_address::Ptr dest_addr,
324  Connect_handler completion_handler,
326  Io_stream::Type sock_type = Io_stream::Type::TCP,
327  Socket_address::Ptr src_addr = nullptr);
328 
341  template <class Callback_ptr>
344  Callback_ptr completion_handler,
346  {
347  Callback_check_type<Callback_ptr, void, Stream::Ref, Io_result>();
348 
349  return Accept_impl(listener, completion_handler, completion_context,
350  completion_handler->template Get_arg<0>(),
351  completion_handler->template Get_arg<1>());
352  }
353 
361  const std::string& host,
362  const std::string& service,
363  addrinfo* hints,
364  Get_addr_info_handler completion_handler,
366 
368  Listen(
369  const std::string& host,
370  const std::string& service,
371  Listen_handler completion_handler,
373  Io_stream::Type sock_type = Io_stream::Type::TCP)
374  {
375  return Listen(Socket_address::Create(host, service), completion_handler, completion_context, sock_type);
376  }
377 
378  Operation_waiter
379  Listen(
380  Socket_address::Ptr addr,
381  Listen_handler completion_handler,
383  Io_stream::Type sock_type = Io_stream::Type::TCP);
384 
400  Operation_waiter
402  Socket_address::Ptr addr,
403  Listen_handler completion_handler,
405  bool multicast = false)
406  {
407  return Listen(addr,
408  completion_handler,
409  completion_context,
410  multicast?Io_stream::Type::UDP_MULTICAST:Io_stream::Type::UDP);
411  }
412 
434  Bind_can(
435  std::string interface,
436  std::vector<int> filter_messges,
437  Listen_handler completion_handler,
439 
440  static std::list<Local_interface>
441  Enumerate_local_interfaces();
442 
443 protected:
445  std::thread thread;
446 
447  Piped_request_waiter::Ptr piped_waiter;
448 
453 
455  void
456  On_enable() override;
457 
459  void
460  On_disable() override;
461 
463  void
465 
466  void
467  On_wait_and_process() override;
468 
469  void
470  On_write(Write_request::Ptr request, Socket_address::Ptr addr = nullptr);
471 
472  void
473  On_read(Read_request::Ptr request, Socket_address::Ptr addr = nullptr);
474 
475  void
476  On_close(Io_request::Ptr request);
477 
478  void
479  On_set_peer_address(Io_request::Ptr request, Socket_address::Ptr addr);
480 
481  void
482  On_connect(Io_request::Ptr request, Stream::Ptr stream);
483 
484  void
485  On_listen(Io_request::Ptr request, Stream::Ptr stream, Socket_address::Ptr addr);
486 
487  void
488  On_bind_can(
489  Io_request::Ptr request,
490  std::vector<int> filter_messges,
491  Stream::Ptr stream,
492  std::string iface_id);
493 
494  void
495  On_accept(Io_request::Ptr request, Stream::Ptr stream, Stream::Ref listenstream);
496 
497  void
498  On_cancel(Io_request::Ptr request, Io_request::Ptr request_to_cancel);
499 
500  void
501  On_get_addr_info(Io_request::Ptr request, Get_addr_info_handler handler);
502 
504  Accept_impl(
505  Socket_listener::Ref listener,
506  Request::Handler completion_handler,
507  Request_completion_context::Ptr completion_context,
508  Stream::Ref& stream_arg,
509  Io_result& result_arg);
510 
511 private:
515  // std::mutex streams_lock;
516 
517  typedef std::unordered_map<Io_stream::Ptr, Stream::Ptr> Streams_map;
518 
519  Streams_map streams;
520 
522  static Singleton<Socket_processor> singleton;
523 
524  Stream::Ptr Lookup_stream(Io_stream::Ptr io_stream);
525 
526  enum class RW_socket_result {
527  DONE,
528  PARTIAL,
529  CLOSED
530  };
531 
532  void
533  Handle_select_accept(Stream::Ptr stream);
534 
535  void
536  Handle_select_connect(Stream::Ptr stream);
537 
538  void
539  Handle_write_requests(Stream::Ptr stream);
540 
541  void
542  Handle_read_requests(Stream::Ptr stream);
543 
544  void
545  Handle_udp_read_requests(Stream::Ptr stream);
546 
548  void
549  Close_stream(Stream::Ptr stream, bool remove_from_streams = true);
550 
551  void
552  Cancel_operation(Io_request::Ptr request_to_cancel);
553 
555  bool
556  Check_for_cancel_request(Io_request::Ptr request, bool force_cancel);
557 };
558 
559 // @{
563 // @}
564 
567  Make_socket_connect_callback,
569  (nullptr, Io_result::OTHER_FAILURE))
570 
571 
573  Make_socket_listen_callback,
574  (Socket_listener::Ref, Io_result),
575  (nullptr, Io_result::OTHER_FAILURE))
576 
579  Make_socket_accept_callback,
580  (Socket_stream::Ref, Io_result),
581  (nullptr, Io_result::OTHER_FAILURE))
582 
585  Make_socket_read_from_callback,
586  (Io_buffer::Ptr, Io_result, Socket_address::Ptr),
587  (nullptr, Io_result::OTHER_FAILURE, nullptr))
588 
591  Make_socket_read_callback,
592  (Io_buffer::Ptr, Io_result),
593  (nullptr, Io_result::OTHER_FAILURE))
594 
595 } /* namespace vsm */
596 } /* namespace ugcs */
597 
598 #endif /* _UGCS_VSM_SOCKET_PROCESSOR_H_ */
static Ptr Get_instance(Args &&...args)
Get global or create new processor instance.
Definition: socket_processor.h:57
void On_disable() override
Handle disable request.
State state
Current state of the stream.
Definition: io_stream.h:323
Singleton class definition.
void Process_on_disable(Request::Ptr)
Process disable in processor context.
#define DEFINE_CALLBACK_BUILDER(__name, __types, __values)
Define callback builder function.
Definition: callback.h:42
int64_t Offset
Offset for read/write operations.
Definition: io_stream.h:75
void On_wait_and_process() override
Called when default processing loop want to wait for work and process it.
std::shared_ptr< Socket_processor > Ptr
Pointer type.
Definition: socket_processor.h:38
Operation_waiter Bind_can(std::string interface, std::vector< int > filter_messges, Listen_handler completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create())
Create CAN socket and associate stream with it.
static Ptr Create(Args &&...args)
Create an instance.
Definition: request_temp_completion_context.h:19
Operation_waiter Get_addr_info(const std::string &host, const std::string &service, addrinfo *hints, Get_addr_info_handler completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create())
Interface for nonblocking call to getaddrinfo().
Callback_base< void >::Ptr<> Handler
Callback denoting a handler of the request.
Definition: request_container.h:86
Definition: socket_processor.h:25
Callback_proxy< void, Io_result > Write_handler
Default prototype for write operation completion handler.
Definition: io_stream.h:86
Io_result
Result of I/O operation.
Definition: io_stream.h:37
std::shared_ptr< Stream > Ptr
Pointer type.
Definition: socket_processor.h:65
Socket processor.
Definition: socket_processor.h:36
std::shared_ptr< Request > Ptr
Pointer type.
Definition: request_container.h:38
Abstract I/O stream interface.
Definition: io_stream.h:66
Socket specific stream.
Definition: socket_processor.h:63
std::shared_ptr< Io_request > Ptr
Pointer type.
Definition: io_request.h:22
Request execution context.
Definition: request_context.h:24
Some other system failure.
static Ptr Create()
Create processor instance.
std::thread thread
Worker thread of socket processor.
Definition: socket_processor.h:445
Callback_proxy< void, Io_buffer::Ptr, Io_result > Read_handler
Default prototype for read operation completion handler.
Definition: io_stream.h:89
Listen_handler Connect_handler
Callback used in Connect() is the same as for listen handler.
Definition: socket_processor.h:305
I/O request declaration.
Helper class for proxying callback invocation.
Definition: callback.h:694
Socket_processor::Stream Socket_stream
Convenience types aliases.
Definition: socket_processor.h:561
std::shared_ptr< Request_container > Ptr
Pointer type.
Definition: request_container.h:31
Request_completion_context::Ptr completion_ctx
Default completion context handled by processor thread is used if user did not specify one...
Definition: socket_processor.h:452
Generic I/O buffer.
Definition: io_buffer.h:33
std::shared_ptr< Read_request > Ptr
Shared pointer to read request.
Definition: io_request.h:147
Callback_proxy< void, std::string, std::string, std::list< addrinfo >, Io_result > Get_addr_info_handler
Callback used in Get_addr_info()
Definition: socket_processor.h:293
Callback_proxy< void, Io_buffer::Ptr, Io_result, Socket_address::Ptr > Read_from_handler
Default prototype for read operation completion handler.
Definition: socket_processor.h:93
Generic container for queued requests.
Definition: request_container.h:30
Type
Stream types.
Definition: io_stream.h:109
virtual Operation_waiter Read_from(size_t max_to_read, Read_from_handler completion_handler, Request_completion_context::Ptr comp_ctx=Request_temp_completion_context::Create())
Used to read from udp socket.
std::shared_ptr< Write_request > Ptr
Shared pointer to write request.
Definition: io_request.h:111
std::shared_ptr< Piped_request_waiter > Ptr
Pointer type.
Definition: piped_request_waiter.h:22
std::shared_ptr< Request_context > Ptr
Pointer type.
Definition: request_context.h:25
std::shared_ptr< Io_buffer > Ptr
Pointer type.
Definition: io_buffer.h:34
Operation_waiter Accept(Socket_listener::Ref listener, Callback_ptr completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create())
Accept incoming TCP/UDP connection.
Definition: socket_processor.h:343
#define DEFINE_COMMON_CLASS(__class_name,...)
Use this macro to define some common attributes for a class.
Definition: utils.h:25
std::shared_ptr< Io_stream > Ptr
Pointer type.
Definition: io_stream.h:67
Request waiter which uses a pipe to signal about request submissions.
Definition: piped_request_waiter.h:21
Callback_proxy< void > Close_handler
Default prototype for close operation completion handler.
Definition: io_stream.h:92
Operation_waiter Bind_udp(Socket_address::Ptr addr, Listen_handler completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create(), bool multicast=false)
create local UDP endpoint socket and associate stream with it.
Definition: socket_processor.h:401
Stream Socket_listener
Stream type is used for listener socket type also.
Definition: socket_processor.h:283
virtual Operation_waiter Write_to(Io_buffer::Ptr, Socket_address::Ptr dest_addr, Write_handler completion_handler, Request_completion_context::Ptr comp_ctx=Request_temp_completion_context::Create())
Used to write to udp socket.
Stream has been or is closed.
State
Stream states.
Definition: io_stream.h:95
Helper class for implementing singletons.
Definition: singleton.h:69
void On_enable() override
Handle processor enabling.
Callback_proxy< void, Stream::Ref, Io_result > Listen_handler
Callback used in Listen()
Definition: socket_processor.h:301
Class for synchronizing with request execution.
Definition: operation_waiter.h:24