VSM C++ SDK
Vehicle Specific Modules SDK
socket_processor.h
Go to the documentation of this file.
1 // Copyright (c) 2014, Smart Projects Holdings Ltd
2 // All rights reserved.
3 // See LICENSE file for license details.
4 
10 #ifndef _SOCKET_PROCESSOR_H_
11 #define _SOCKET_PROCESSOR_H_
12 
13 #include <ugcs/vsm/io_request.h>
15 #include <ugcs/vsm/singleton.h>
16 #include <ugcs/vsm/socket_address.h>
17 
18 #include <thread>
19 #include <unordered_map>
20 #include <vector>
21 
22 namespace ugcs {
23 namespace vsm {
24 
26 public:
27  Local_interface(const std::string& name);
28 public:
29  std::string name;
30  bool is_multicast = false;
31  bool is_loopback = false;
32  std::vector<Socket_address::Ptr> adresses;
33 };
34 
37 {
39 public:
46 
47  virtual
49 
51  static Ptr
52  Create();
53 
55  template <typename... Args>
56  static Ptr
57  Get_instance(Args &&... args)
58  {
59  return singleton.Get_instance(std::forward<Args>(args)...);
60  }
61 
63  class Stream: public Io_stream
64  {
66  public:
67  template<typename... Args>
68  Stream(Socket_processor::Ptr processor, Args&& ...args):
69  Io_stream(std::forward<Args>(args)...),
70  processor(processor)
71  {}
72 
74 
75  ~Stream();
76 
77  void
78  Set_state(Io_stream::State state);
79 
80  void
81  Set_connect_request(Io_request::Ptr request);
82 
83  void
84  Close_socket();
85 
87  Get_connect_request();
88 
89  void
90  Abort_pending_requests(Io_result result = Io_result::CLOSED);
91 
94 
99  virtual Operation_waiter
100  Read_from(
101  size_t max_to_read,
102  Read_from_handler completion_handler,
104 
107  virtual Operation_waiter
108  Write_to(
110  Socket_address::Ptr dest_addr,
111  Write_handler completion_handler,
113 
114  /* Returns a newly created Socket_address::Ptr with peer address. */
115  Socket_address::Ptr
116  Get_peer_address();
117 
118  /* Set peer address for udp stream
119  * All subsequent Write() calls will go to this address.*/
120  void
121  Set_peer_address(Socket_address::Ptr);
122 
123  /* Returns a newly created Socket_address::Ptr with local address. */
124  Socket_address::Ptr
125  Get_local_address();
126 
127  bool
128  Add_multicast_group(Socket_address::Ptr interface, Socket_address::Ptr multicast);
129 
130  bool
131  Remove_multicast_group(Socket_address::Ptr interface, Socket_address::Ptr multicast);
132 
133  /* Enable/disable sending of broadcast packets.
134  * Valid for SOCK_DGRAM streams only
135  * @param enable true - enable sending of broadcast packets
136  * false - disable sending of broadcast packets
137  * @return true on success */
138  bool
139  Enable_broadcast(bool enable);
140 
141  typedef std::unique_ptr<std::vector<uint8_t>> Buf_ptr;
142 
143  private:
144 
145  template<typename T>
146  class Circular_buffer
147  {
148  public:
149  bool
150  Push(T&& item){
151  bool ret = true;
152  if (writer == reader) {
153  if (is_empty) {
154  if (buffer.size() < MAX_CACHED_COUNT) {
155  buffer.resize(MAX_CACHED_COUNT);
156  }
157  is_empty = false;
158  } else {
159  // buffer full.
160  reader++;
161  if (reader == buffer.size()) {
162  reader = 0;
163  }
164  ret = true;
165  }
166  }
167  buffer[writer] = std::move(item);
168  writer++;
169  if (writer == buffer.size()) {
170  writer = 0;
171  }
172  return ret;
173  };
174 
175  bool
176  Pull(T& ret)
177  {
178  if (is_empty) {
179  return false;
180  }
181  ret = std::move(buffer[reader]);
182  reader++;
183  if (reader == buffer.size()) {
184  reader = 0;
185  }
186  if (reader == writer) {
187  is_empty = true;
188  }
189  return true;
190  };
191  bool
192  Is_empty()
193  {
194  return is_empty;
195  }
196  void
197  Clear()
198  {
199  writer = 0;
200  reader = 0;
201  is_empty = true;
202  buffer.clear();
203  }
204  private:
205  size_t writer = 0;
206  size_t reader = 0;
207  std::vector<T> buffer;
208  bool is_empty = true;
209  };
210 
211  Socket_address::Ptr peer_address = nullptr;
212 
213  Socket_address::Ptr local_address = nullptr;
214 
215  sockets::Socket_handle s = INVALID_SOCKET;
216 
217  Socket_processor::Ptr processor;
218 
219  Io_request::Ptr connect_request;
220 
221  // true if socket was connected using connect() call.
222  // I.e. no need to specify destination when doing send().
223  // Applies to both SOCK_DGRAM and SOCK_STREAM
224  bool is_connected = false;
225 
226  typedef std::pair<Write_request::Ptr, Socket_address::Ptr> Write_requests_entry;
227  std::list<Write_requests_entry> write_requests;
228 
229  typedef std::pair<Read_request::Ptr, Socket_address::Ptr> Read_requests_entry;
230  std::list<Read_requests_entry> read_requests;
231 
232  std::list<Io_request::Ptr> accept_requests;
233 
234  Buf_ptr reading_buffer;
235  size_t read_bytes = 0; // bytes read by current read request
236  size_t written_bytes = 0; // bytes written by current write request
237 
238  // UDP multi-stream specific stuff.
239  typedef std::pair<Buf_ptr, Socket_address::Ptr> Cache_entry;
240  // Accepted UDP streams for this stream/socket.
241  std::unordered_map<Socket_address::Ptr, Stream::Ptr> substreams;
242  // If present then this is a substream of another stream.
243  Stream::Ptr parent_stream = nullptr;
244  // Packet cache. Keeps unread packets until Read called.
245  Circular_buffer<Cache_entry> packet_cache;
246  // maximum packet count the stream will cache.
247  // When cache is full packets will be dropped.
248  static constexpr size_t MAX_CACHED_COUNT = 50;
249 
250  friend class Socket_processor;
251 
252  sockets::Socket_handle
253  Get_socket();
254 
255  void
256  Set_socket(sockets::Socket_handle s);
257 
259  void
260  Update_name();
261 
263  virtual Operation_waiter
264  Write_impl(Io_buffer::Ptr buffer,
265  Offset offset,
266  Write_handler completion_handler,
268 
270  virtual Operation_waiter
271  Read_impl(size_t max_to_read, size_t min_to_read, Offset offset,
272  Read_handler completion_handler,
273  Request_completion_context::Ptr comp_ctx) override;
274 
276  virtual Operation_waiter
277  Close_impl(Close_handler completion_handler,
278  Request_completion_context::Ptr comp_ctx) override;
279 
280  void
281  Process_udp_read_requests();
282  };
283 
286 
289  typedef Callback_proxy<
290  void, // return void
291  std::string, // host name as passed to Get_addr_info() call
292  std::string, // service as passed to Get_addr_info() call
293  std::list<addrinfo>, // list of addrinfo returned from Get_addr_info()
294  Io_result // result of operation: OK|TIMED_OUT|BAD_ADDRESS|CANCELLED
296 
299  typedef Callback_proxy<
300  void, // return void
301  Stream::Ref, // stream
302  Io_result // result of operation: OK|TIMED_OUT|BAD_ADDRESS|CANCELLED
304 
308 
310  Connect(std::string host, std::string service,
311  Connect_handler completion_handler,
313  Io_stream::Type sock_type = Io_stream::Type::TCP,
314  Socket_address::Ptr src_addr = nullptr)
315  {
316  return Connect(
317  Socket_address::Create(host, service),
318  completion_handler,
319  completion_context,
320  sock_type,
321  src_addr);
322  }
323 
325  Connect(Socket_address::Ptr dest_addr,
326  Connect_handler completion_handler,
328  Io_stream::Type sock_type = Io_stream::Type::TCP,
329  Socket_address::Ptr src_addr = nullptr);
330 
343  template <class Callback_ptr>
346  Callback_ptr completion_handler,
348  {
349  Callback_check_type<Callback_ptr, void, Stream::Ref, Io_result>();
350 
351  return Accept_impl(listener, completion_handler, completion_context,
352  completion_handler->template Get_arg<0>(),
353  completion_handler->template Get_arg<1>());
354  }
355 
362  Get_addr_info(
363  const std::string& host,
364  const std::string& service,
365  addrinfo* hints,
366  Get_addr_info_handler completion_handler,
368 
370  Listen(
371  const std::string& host,
372  const std::string& service,
373  Listen_handler completion_handler,
375  Io_stream::Type sock_type = Io_stream::Type::TCP)
376  {
377  return Listen(Socket_address::Create(host, service), completion_handler, completion_context, sock_type);
378  }
379 
381  Listen(
382  Socket_address::Ptr addr,
383  Listen_handler completion_handler,
385  Io_stream::Type sock_type = Io_stream::Type::TCP);
386 
404  Socket_address::Ptr addr,
405  Listen_handler completion_handler,
407  bool multicast = false)
408  {
409  return Listen(addr, completion_handler, completion_context, multicast?Io_stream::Type::UDP_MULTICAST:Io_stream::Type::UDP);
410  }
411 
433  Bind_can(
434  std::string interface,
435  std::vector<int> filter_messges,
436  Listen_handler completion_handler,
438 
439  static std::list<Local_interface>
440  Enumerate_local_interfaces();
441 
442 protected:
443 
445  std::thread thread;
446 
447  Piped_request_waiter::Ptr piped_waiter;
448 
453 
455  virtual void
456  On_enable() override;
457 
459  virtual void
460  On_disable() override;
461 
463  void
464  Process_on_disable(Request::Ptr);
465 
466  virtual void
467  On_wait_and_process() override;
468 
469  void
470  On_write(Write_request::Ptr request, Socket_address::Ptr addr = nullptr);
471 
472  void
473  On_read(Read_request::Ptr request, Socket_address::Ptr addr = nullptr);
474 
475  void
476  On_close(Io_request::Ptr request);
477 
478  void
479  On_set_peer_address(Io_request::Ptr request, Socket_address::Ptr addr);
480 
481  void
482  On_connect(Io_request::Ptr request, Stream::Ptr stream);
483 
484  void
485  On_listen(Io_request::Ptr request, Stream::Ptr stream, Socket_address::Ptr addr);
486 
487  void
488  On_bind_can(
489  Io_request::Ptr request,
490  std::vector<int> filter_messges,
491  Stream::Ptr stream,
492  std::string iface_id);
493 
494  void
495  On_accept(Io_request::Ptr request, Stream::Ptr stream, Stream::Ref listenstream);
496 
497  void
498  On_cancel(Io_request::Ptr request, Io_request::Ptr request_to_cancel);
499 
500  void
501  On_get_addr_info(Io_request::Ptr request, Get_addr_info_handler handler);
502 
504  Accept_impl(Socket_listener::Ref listener,
505  Request::Handler completion_handler,
506  Request_completion_context::Ptr completion_context,
507  Stream::Ref& stream_arg, Io_result& result_arg);
508 
509 private:
513  // std::mutex streams_lock;
514 
515  typedef std::unordered_map<Io_stream::Ptr, Stream::Ptr> Streams_map;
516 
517  Streams_map streams;
518 
520  static Singleton<Socket_processor> singleton;
521 
522  Stream::Ptr Lookup_stream(Io_stream::Ptr io_stream);
523 
524  enum class RW_socket_result {
525  DONE,
526  PARTIAL,
527  CLOSED
528  };
529 
530  void
531  Handle_select_accept(Stream::Ptr stream);
532 
533  void
534  Handle_select_connect(Stream::Ptr stream);
535 
536  void
537  Handle_write_requests(Stream::Ptr stream);
538 
539  void
540  Handle_read_requests(Stream::Ptr stream);
541 
542  void
543  Handle_udp_read_requests(Stream::Ptr stream);
544 
546  void
547  Close_stream(Stream::Ptr stream, bool remove_from_streams = true);
548 
549  void
550  Cancel_operation(Io_request::Ptr request_to_cancel);
551 
553  bool
554  Check_for_cancel_request(Io_request::Ptr request, bool force_cancel);
555 };
556 
557 // @{
560 typedef Socket_processor::Socket_listener Socket_listener;
561 // @}
562 
563 // Supported UDP packet size. Used as default with UDP sockets for reading.
564 // Read on UDP socket must reserve space for maximum payload it can possibly receive.
565 // Theoretical max is 64K which will always be fragmented.
566 // We use what is typically the largest unfragmented packet.
567 // This number is ether_v2 MTU - IP header - UDP header.
568 static constexpr int MIN_UDP_PAYLOAD_SIZE_TO_READ = 1500 - 60 - 8;
569 
572  Make_socket_connect_callback,
574  (nullptr, Io_result::OTHER_FAILURE))
575 
576 
578  Make_socket_listen_callback,
579  (Socket_listener::Ref, Io_result),
580  (nullptr, Io_result::OTHER_FAILURE))
581 
584  Make_socket_accept_callback,
585  (Socket_stream::Ref, Io_result),
586  (nullptr, Io_result::OTHER_FAILURE))
587 
590  Make_socket_read_from_callback,
591  (Io_buffer::Ptr, Io_result, Socket_address::Ptr),
592  (nullptr, Io_result::OTHER_FAILURE, nullptr))
593 
596  Make_socket_read_callback,
597  (Io_buffer::Ptr, Io_result),
598  (nullptr, Io_result::OTHER_FAILURE))
599 
600 } /* namespace vsm */
601 } /* namespace ugcs */
602 
603 #endif /* _SOCKET_PROCESSOR_H_ */
UGCS root namespace.
Definition: android-linux/ugcs/vsm/platform_sockets.h:27
static Ptr Get_instance(Args &&...args)
Get global or create new processor instance.
Definition: socket_processor.h:57
Singleton class definition.
#define DEFINE_CALLBACK_BUILDER(__name, __types, __values)
Define callback builder function.
Definition: callback.h:42
int64_t Offset
Offset for read/write operations.
Definition: io_stream.h:62
std::shared_ptr< Socket_processor > Ptr
Pointer type.
Definition: socket_processor.h:38
static Ptr Create(Args &&...args)
Create an instance.
Definition: request_temp_completion_context.h:19
STL namespace.
Callback_base< void >::Ptr<> Handler
Callback denoting a handler of the request.
Definition: request_container.h:85
Definition: socket_processor.h:25
Io_result
Result of I/O operation.
Definition: io_stream.h:25
std::shared_ptr< Stream > Ptr
Pointer type.
Definition: socket_processor.h:65
Socket processor.
Definition: socket_processor.h:36
std::shared_ptr< Request > Ptr
Pointer type.
Definition: request_container.h:38
Abstract I/O stream interface.
Definition: io_stream.h:54
Socket specific stream.
Definition: socket_processor.h:63
std::shared_ptr< Io_request > Ptr
Pointer type.
Definition: io_request.h:22
Request execution context.
Definition: request_context.h:24
Some other system failure.
std::thread thread
Worker thread of socket processor.
Definition: socket_processor.h:445
Listen_handler Connect_handler
Callback used in Connect() is the same as for listen handler.
Definition: socket_processor.h:307
I/O request declaration.
Helper class for proxying callback invocation.
Definition: callback.h:691
Socket_processor::Stream Socket_stream
Convenience types aliases.
Definition: socket_processor.h:559
std::shared_ptr< Request_container > Ptr
Pointer type.
Definition: request_container.h:32
Request_completion_context::Ptr completion_ctx
Default completion context handled by processor thread is used if user did not specify one...
Definition: socket_processor.h:452
Generic I/O buffer.
Definition: io_buffer.h:33
std::shared_ptr< Read_request > Ptr
Shared pointer to read request.
Definition: io_request.h:148
Callback_proxy< void, std::string, std::string, std::list< addrinfo >, Io_result > Get_addr_info_handler
Callback used in Get_addr_info()
Definition: socket_processor.h:295
Callback_proxy< void, Io_buffer::Ptr, Io_result, Socket_address::Ptr > Read_from_handler
Default prototype for read operation completion handler.
Definition: socket_processor.h:93
Generic container for queued requests.
Definition: request_container.h:31
Type
Stream types.
Definition: io_stream.h:96
std::shared_ptr< Write_request > Ptr
Shared pointer to write request.
Definition: io_request.h:112
std::shared_ptr< Piped_request_waiter > Ptr
Pointer type.
Definition: piped_request_waiter.h:22
std::shared_ptr< Request_context > Ptr
Pointer type.
Definition: request_context.h:25
std::shared_ptr< Io_buffer > Ptr
Pointer type.
Definition: io_buffer.h:34
Operation_waiter Accept(Socket_listener::Ref listener, Callback_ptr completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create())
Accept incoming TCP/UDP connection.
Definition: socket_processor.h:345
#define DEFINE_COMMON_CLASS(__class_name,...)
Use this macro to define some common attributes for a class.
Definition: utils.h:25
std::shared_ptr< Io_stream > Ptr
Pointer type.
Definition: io_stream.h:55
Request waiter which uses a pipe to signal about request submissions.
Definition: piped_request_waiter.h:21
Operation_waiter Bind_udp(Socket_address::Ptr addr, Listen_handler completion_handler, Request_completion_context::Ptr completion_context=Request_temp_completion_context::Create(), bool multicast=false)
create local UDP endpoint socket and associate stream with it.
Definition: socket_processor.h:403
Stream Socket_listener
Stream type is used for listener socket type also.
Definition: socket_processor.h:285
Stream has been or is closed.
State
Stream states.
Definition: io_stream.h:82
Helper class for implementing singletons.
Definition: singleton.h:69
Callback_proxy< void, Stream::Ref, Io_result > Listen_handler
Callback used in Listen()
Definition: socket_processor.h:303
Class for synchronizing with request execution.
Definition: operation_waiter.h:24