# # # patch "netsync.cc" # from [a6fa0d8cf1ad02d3e42aa1a84a3951d686c6b88f] # to [2389c7ac9c2468b5d2f974076204bf2640f1bde7] # ============================================================ --- netsync.cc a6fa0d8cf1ad02d3e42aa1a84a3951d686c6b88f +++ netsync.cc 2389c7ac9c2468b5d2f974076204bf2640f1bde7 @@ -312,8 +312,47 @@ struct netsync_error netsync_error(string const & s): msg(s) {} }; -class session_base +class reactable { + static unsigned int count; +protected: + static unsigned int num_reactables() { return count; } +public: + reactable() { ++count; } + virtual ~reactable() + { + I(count != 0); + --count; + } + + // Handle an I/O event. + virtual bool do_io(Netxx::Probe::ready_type event) = 0; + // Can we timeout after being idle for a long time? + virtual bool can_timeout() = 0; + // Have we been idle for too long? + virtual bool timed_out(time_t now) = 0; + // Do one unit of work. + virtual bool do_work(transaction_guard & guard) = 0; + // Is there any work waiting to be done? + virtual bool arm() = 0; + // Are we a pipe pair (as opposed to a socket)? + // Netxx::PipeCompatibleProbe acts slightly differently, depending. + virtual bool is_pipe_pair() = 0; + // Netxx::Probe::ready() returns sockets, reactor needs to be + // able to map them back to reactables. + virtual vector get_sockets() = 0; + // Netxx::StreamBase and Netxx::StreamServer don't have a + // common base, so we don't have anything we can expose to + // let the reactor add us to the probe itself. + virtual void add_to_probe(Netxx::PipeCompatibleProbe & probe) = 0; + virtual void remove_from_probe(Netxx::PipeCompatibleProbe & probe) = 0; + // Where are we talking to / listening on? + virtual string name() = 0; +}; +unsigned int reactable::count = 0; + +class session_base : public reactable +{ bool read_some(); bool write_some(); void mark_recent_io() @@ -339,8 +378,11 @@ public: } public: string peer_id; + string name() { return peer_id; } +private: shared_ptr str; time_t last_io_time; +public: enum { @@ -365,8 +407,46 @@ public: virtual bool arm() = 0; virtual bool do_work(transaction_guard & guard) = 0; - virtual Netxx::Probe::ready_type which_events(); +private: + Netxx::Probe::ready_type which_events(); +public: virtual bool do_io(Netxx::Probe::ready_type); + bool can_timeout() { return true; } + bool timed_out(time_t now) + { + return static_cast(last_io_time + constants::netsync_timeout_seconds) + < static_cast(now); + } + + bool is_pipe_pair() + { + return str->get_socketfd() == -1; + } + vector get_sockets() + { + vector out; + Netxx::socket_type fd = str->get_socketfd(); + if (fd == -1) + { + shared_ptr pipe = + boost::dynamic_pointer_cast(str); + I(pipe); + out.push_back(pipe->get_readfd()); + out.push_back(pipe->get_writefd()); + } + else + out.push_back(fd); + return out; + } + void add_to_probe(Netxx::PipeCompatibleProbe & probe) + { + probe.add(*str, which_events()); + } + void remove_from_probe(Netxx::PipeCompatibleProbe & probe) + { + I(!is_pipe_pair()); + probe.remove(*str); + } }; Netxx::Probe::ready_type @@ -2607,10 +2687,11 @@ class reactor; class reactor; -class listener_base +class listener_base : public reactable { -public: +protected: shared_ptr srv; +public: listener_base(shared_ptr srv) : srv(srv) { @@ -2619,6 +2700,37 @@ public: { } virtual bool do_io(Netxx::Probe::ready_type event) = 0; + bool timed_out(time_t now) { return false; } + bool do_work(transaction_guard & guard) { return true; } + bool arm() { return false; } + bool can_timeout() { return false; } + + string name() { return ""; } // FIXME + + bool is_pipe_pair() + { + return false; + } + vector get_sockets() + { + return srv->get_probe_info()->get_sockets(); + } + void add_to_probe(Netxx::PipeCompatibleProbe & probe) + { + if (num_reactables() >= constants::netsync_connection_limit) + { + W(F("session limit %d reached, some connections " + "will be refused") % constants::netsync_connection_limit); + } + else + { + probe.add(*srv); + } + } + void remove_from_probe(Netxx::PipeCompatibleProbe & probe) + { + probe.remove(*srv); + } }; class listener : public listener_base @@ -2664,72 +2776,47 @@ class reactor { bool have_pipe; Netxx::Timeout forever, timeout, instant; + bool can_have_timeout; Netxx::PipeCompatibleProbe probe; - set > sessions; - set > listeners; + set > items; - map > session_lookup; - map > listener_lookup; + map > lookup; bool readying; int have_armed; - void ready_for_io(shared_ptr sess, transaction_guard & guard) + void ready_for_io(shared_ptr item, transaction_guard & guard) { - if (sess->do_work(guard)) + if (item->do_work(guard)) { try { - if (sess->arm()) + if (item->arm()) { ++have_armed; } - probe.add(*sess->str, sess->which_events()); - if (sess->str->get_socketfd() == -1) + item->add_to_probe(probe); + vector ss = item->get_sockets(); + for (vector::iterator i = ss.begin(); + i != ss.end(); ++i) { - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(pipe); - session_lookup.insert(make_pair(pipe->get_readfd(), - sess)); - session_lookup.insert(make_pair(pipe->get_writefd(), - sess)); + lookup.insert(make_pair(*i, item)); } - else - { - session_lookup.insert(make_pair(sess->str->get_socketfd(), - sess)); - } + if (item->can_timeout()) + can_have_timeout = true; } catch (bad_decode & bd) { W(F("protocol error while processing peer %s: '%s'") - % sess->peer_id % bd.what); - remove(sess); + % item->name() % bd.what); + remove(item); } } else { - remove(sess); + remove(item); } } - void ready_for_io(shared_ptr listen, transaction_guard & guard) - { - if (sessions.size() >= constants::netsync_connection_limit) - { - W(F("session limit %d reached, some connections " - "will be refused") % constants::netsync_connection_limit); - } - else - { - probe.add(*listen->srv); - } - vector ss = listen->srv->get_probe_info()->get_sockets(); - for (vector::iterator i = ss.begin(); i != ss.end(); ++i) - { - listener_lookup.insert(make_pair(*i, listen)); - } - } public: reactor() : have_pipe(false), @@ -2738,68 +2825,47 @@ public: readying(false), have_armed(0) { } - void add(shared_ptr sess, transaction_guard & guard) + void add(shared_ptr item, transaction_guard & guard) { I(!have_pipe); - if (sess->str->get_socketfd() == -1) + if (item->is_pipe_pair()) { - I(sessions.size() == 0); - I(listeners.size() == 0); + I(items.size() == 0); have_pipe = true; } - sessions.insert(sess); + items.insert(item); if (readying) - ready_for_io(sess, guard); + ready_for_io(item, guard); } - void add(shared_ptr listen, transaction_guard & guard) + void remove(shared_ptr item) { - I(!have_pipe); - listeners.insert(listen); - if (readying) - ready_for_io(listen, guard); - } - void remove(shared_ptr sess) - { - set >::iterator i = sessions.find(sess); - if (i != sessions.end()) + set >::iterator i = items.find(item); + if (i != items.end()) { - sessions.erase(i); + items.erase(i); have_pipe = false; } } - void remove(shared_ptr listen) - { - set >::iterator i = listeners.find(listen); - if (i != listeners.end()) - listeners.erase(i); - } int size() const { - return sessions.size() + listeners.size(); + return items.size(); } void ready(transaction_guard & guard) { readying = true; have_armed = 0; + can_have_timeout = false; probe.clear(); - session_lookup.clear(); - set > s_todo = sessions; - for (set >::iterator i = s_todo.begin(); - i != s_todo.end(); ++i) + lookup.clear(); + set > todo = items; + for (set >::iterator i = todo.begin(); + i != todo.end(); ++i) { ready_for_io(*i, guard); } - - listener_lookup.clear(); - set > l_todo = listeners; - for (set >::iterator i = l_todo.begin(); - i != l_todo.end(); ++i) - { - ready_for_io(*i, guard); - } } bool do_io() { @@ -2808,7 +2874,7 @@ public: readying = false; bool timed_out = true; Netxx::Timeout how_long; - if (sessions.empty()) + if (!can_have_timeout) how_long = forever; else if (have_armed > 0) { @@ -2832,31 +2898,24 @@ public: timed_out = false; - map >::iterator s - = session_lookup.find(fd); - map >::iterator l - = listener_lookup.find(fd); - if (s != session_lookup.end()) + map >::iterator r + = lookup.find(fd); + if (r != lookup.end()) { - if (sessions.find(s->second) != sessions.end()) + if (items.find(r->second) != items.end()) { - if (!s->second->do_io(event)) + if (!r->second->do_io(event)) { - remove(s->second); + remove(r->second); } } else { - L(FL("Got i/o on dead peer %s") % s->second->peer_id); + L(FL("Got i/o on dead peer %s") % r->second->name()); } if (!pipe) - probe.remove(*s->second->str); + r->second->remove_from_probe(probe); } - else if (l != listener_lookup.end()) - { - l->second->do_io(event); - probe.remove(*l->second->srv); - } else { L(FL("got woken up for action on unknown fd %d") % fd); @@ -2868,15 +2927,14 @@ public: void prune() { time_t now = ::time(NULL); - set > s_todo = sessions; - for (set >::iterator i = s_todo.begin(); - i != s_todo.end(); ++i) + set > todo = items; + for (set >::iterator i = todo.begin(); + i != todo.end(); ++i) { - if (static_cast((*i)->last_io_time + constants::netsync_timeout_seconds) - < static_cast(now)) + if ((*i)->timed_out(now)) { P(F("peer %s has been idle too long, disconnecting") - % (*i)->peer_id); + % (*i)->name()); remove(*i); } } @@ -2914,7 +2972,7 @@ listener::do_io(Netxx::Probe::ready_type lexical_cast(client), str)); sess->begin_service(); I(guard); - react.add(shared_ptr(sess), *guard); + react.add(sess, *guard); } return true; } @@ -2989,7 +3047,7 @@ call_server(options & opts, info.client.unparsed(), server)); reactor react; - react.add(shared_ptr(sess), guard); + react.add(sess, guard); while (true) { @@ -3117,7 +3175,7 @@ serve_connections(options & opts, shared_ptr listen(new listener(opts, lua, project, keys, react, role, addresses, guard, use_ipv6)); - react.add(shared_ptr(listen), *guard); + react.add(listen, *guard); while (true) @@ -3140,7 +3198,7 @@ serve_connections(options & opts, if (sess) { - react.add(shared_ptr(sess), *guard); + react.add(sess, *guard); L(FL("Opened connection to %s") % sess->peer_id); } } @@ -3168,7 +3226,7 @@ serve_single_connection(project_t & proj transaction_guard guard(project.db); reactor react; - react.add(shared_ptr(sess), guard); + react.add(sess, guard); while (react.size() > 0) {