[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: Handle DHT lookup aborts.
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: Handle DHT lookup aborts. |
Date: |
Tue, 31 Mar 2020 13:19:02 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new 2a2a558 Handle DHT lookup aborts.
2a2a558 is described below
commit 2a2a558aa553aca3394513114d13d4888ae822db
Author: Bernd Fix <address@hidden>
AuthorDate: Tue Mar 31 13:12:30 2020 +0200
Handle DHT lookup aborts.
---
src/cmd/gnunet-service-gns-go/main.go | 2 +
src/cmd/peer_mockup/process.go | 26 +++---
src/gnunet/go.mod | 4 +-
src/gnunet/go.sum | 5 ++
src/gnunet/message/factory.go | 2 +
src/gnunet/message/msg_dht.go | 37 ++++++++
src/gnunet/modules.go | 11 ++-
src/gnunet/service/client.go | 35 +++++---
src/gnunet/service/context.go | 68 +++++++++++++++
src/gnunet/service/{namecache => dht}/module.go | 19 +++--
src/gnunet/service/gns/dns.go | 13 ++-
src/gnunet/service/gns/module.go | 109 +++++++++++++++++++-----
src/gnunet/service/gns/service.go | 71 +++++++++++----
src/gnunet/service/namecache/module.go | 5 +-
src/gnunet/service/service.go | 52 ++++++++---
src/gnunet/transport/channel.go | 32 +++++--
src/gnunet/transport/channel_netw.go | 92 ++++++++++++++++++--
src/gnunet/transport/connection.go | 10 ++-
src/gnunet/util/misc.go | 1 +
src/gnunet/util/time.go | 19 +++++
20 files changed, 510 insertions(+), 103 deletions(-)
diff --git a/src/cmd/gnunet-service-gns-go/main.go
b/src/cmd/gnunet-service-gns-go/main.go
index 5f4062a..2815eaa 100644
--- a/src/cmd/gnunet-service-gns-go/main.go
+++ b/src/cmd/gnunet-service-gns-go/main.go
@@ -89,6 +89,8 @@ loop:
break loop
case syscall.SIGHUP:
logger.Println(logger.INFO, "[gns] SIGHUP")
+ case syscall.SIGURG:
+ // TODO:
https://github.com/golang/go/issues/37942
default:
logger.Println(logger.INFO, "[gns] Unhandled
signal: "+sig.String())
}
diff --git a/src/cmd/peer_mockup/process.go b/src/cmd/peer_mockup/process.go
index a26177b..510fc48 100644
--- a/src/cmd/peer_mockup/process.go
+++ b/src/cmd/peer_mockup/process.go
@@ -9,6 +9,12 @@ import (
"gnunet/message"
"gnunet/transport"
"gnunet/util"
+
+ "github.com/bfix/gospel/concurrent"
+)
+
+var (
+ sig = concurrent.NewSignaller()
)
func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
@@ -20,7 +26,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
in := make(chan message.Message)
go func() {
for {
- msg, err := c.Receive()
+ msg, err := c.Receive(sig)
if err != nil {
fmt.Printf("Receive: %s\n", err.Error())
return
@@ -33,7 +39,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
init := (from == p)
if init {
peerid := util.NewPeerID(p.GetID())
- c.Send(message.NewTransportTcpWelcomeMsg(peerid))
+ c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
}
// remember peer addresses (only ONE!)
@@ -53,11 +59,11 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
case *message.TransportTcpWelcomeMsg:
peerid := util.NewPeerID(p.GetID())
if init {
- c.Send(message.NewHelloMsg(peerid))
+ c.Send(message.NewHelloMsg(peerid), sig)
target := util.NewPeerID(t.GetID())
-
c.Send(message.NewTransportPingMsg(target, tAddr))
+
c.Send(message.NewTransportPingMsg(target, tAddr), sig)
} else {
-
c.Send(message.NewTransportTcpWelcomeMsg(peerid))
+
c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
}
case *message.HelloMsg:
@@ -67,7 +73,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
if err := mOut.Sign(p.PrvKey()); err != nil {
return err
}
- c.Send(mOut)
+ c.Send(mOut, sig)
case *message.TransportPongMsg:
rc, err := msg.Verify(t.PubKey())
@@ -79,14 +85,14 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
}
send[message.TRANSPORT_PONG] = true
if mOut, ok :=
pending[message.TRANSPORT_SESSION_SYN]; ok {
- c.Send(mOut)
+ c.Send(mOut, sig)
}
case *message.SessionSynMsg:
mOut := message.NewSessionSynAckMsg()
mOut.Timestamp = msg.Timestamp
if send[message.TRANSPORT_PONG] {
- c.Send(mOut)
+ c.Send(mOut, sig)
} else {
pending[message.TRANSPORT_SESSION_SYN]
= mOut
}
@@ -97,7 +103,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
case *message.SessionAckMsg:
case *message.SessionKeepAliveMsg:
-
c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce))
+
c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce), sig)
case *message.EphemeralKeyMsg:
rc, err := msg.Verify(t.PubKey())
@@ -108,7 +114,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer)
(err error) {
return errors.New("EPHKEY verification
failed")
}
t.SetEphKeyMsg(msg)
- c.Send(p.EphKeyMsg())
+ c.Send(p.EphKeyMsg(), sig)
secret := crypto.SharedSecret(p.EphPrvKey(),
t.EphKeyMsg().Public())
c.SharedSecret(util.Clone(secret.Bits[:]))
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 83720fd..443666f 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
go 1.12
require (
- github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449
+ github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46
github.com/miekg/dns v1.1.26
- golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
+ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
)
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index e9ece97..8c956b4 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,7 @@
github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449
h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ=
github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod
h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
+github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46
h1:5aNd1/ISbO1ltgmyUGza7kdaN4fD/Qal6uKZk9goMhw=
+github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46/go.mod
h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU=
github.com/miekg/dns v1.1.26/go.mod
h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -7,6 +9,8 @@ golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472
h1:Gv7RPwsi3eZ2Fgewe3CBsu
golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392
h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod
h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
+golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
+golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
@@ -19,6 +23,7 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod
h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe
h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
diff --git a/src/gnunet/message/factory.go b/src/gnunet/message/factory.go
index e6e6e73..33e806b 100644
--- a/src/gnunet/message/factory.go
+++ b/src/gnunet/message/factory.go
@@ -59,6 +59,8 @@ func NewEmptyMessage(msgType uint16) (Message, error) {
//------------------------------------------------------------------
case DHT_CLIENT_GET:
return NewDHTClientGetMsg(nil), nil
+ case DHT_CLIENT_GET_STOP:
+ return NewDHTClientGetStopMsg(nil), nil
case DHT_CLIENT_RESULT:
return NewDHTClientResultMsg(nil), nil
diff --git a/src/gnunet/message/msg_dht.go b/src/gnunet/message/msg_dht.go
index 925cb71..62a233f 100644
--- a/src/gnunet/message/msg_dht.go
+++ b/src/gnunet/message/msg_dht.go
@@ -126,3 +126,40 @@ func (m *DHTClientResultMsg) String() string {
func (msg *DHTClientResultMsg) Header() *MessageHeader {
return &MessageHeader{msg.MsgSize, msg.MsgType}
}
+
+//----------------------------------------------------------------------
+// DHT_CLIENT_GET_STOP
+//----------------------------------------------------------------------
+
+// DHTClientGetStopMsg
+type DHTClientGetStopMsg struct {
+ MsgSize uint16 `order:"big"` // total size of message
+ MsgType uint16 `order:"big"` // DHT_CLIENT_GET_STOP (144)
+ Reserved uint32 `order:"big"` // Reserved (0)
+ Id uint64 `order:"big"` // Unique ID identifying this
request
+ Key *crypto.HashCode // The key to search for
+}
+
+// NewDHTClientGetStopMsg creates a new default DHTClientGetStopMsg object.
+func NewDHTClientGetStopMsg(key *crypto.HashCode) *DHTClientGetStopMsg {
+ if key == nil {
+ key = new(crypto.HashCode)
+ }
+ return &DHTClientGetStopMsg{
+ MsgSize: 80,
+ MsgType: DHT_CLIENT_GET_STOP,
+ Reserved: 0, // mandatory
+ Id: 0,
+ Key: key,
+ }
+}
+
+// String returns a human-readable representation of the message.
+func (m *DHTClientGetStopMsg) String() string {
+ return fmt.Sprintf("DHTClientGetStopMsg{Id:%d,Key=%s}", m.Id,
hex.EncodeToString(m.Key.Bits))
+}
+
+// Header returns the message header in a separate instance.
+func (msg *DHTClientGetStopMsg) Header() *MessageHeader {
+ return &MessageHeader{msg.MsgSize, msg.MsgType}
+}
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index bc2993e..b71eb40 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -29,6 +29,7 @@
package gnunet
import (
+ "gnunet/service/dht"
"gnunet/service/gns"
"gnunet/service/namecache"
)
@@ -37,6 +38,7 @@ import (
type Instances struct {
GNS *gns.GNSModule
Namecache *namecache.NamecacheModule
+ DHT *dht.DHTModule
}
// Local reference to instance list
@@ -50,9 +52,14 @@ func init() {
// Namecache (no calls to other modules)
Modules.Namecache = new(namecache.NamecacheModule)
+ // DHT (no calls to other modules)
+ Modules.DHT = new(dht.DHTModule)
+
// GNS (calls Namecache, DHT and Identity)
Modules.GNS = &gns.GNSModule{
- LookupLocal: Modules.Namecache.Get,
- StoreLocal: Modules.Namecache.Put,
+ LookupLocal: Modules.Namecache.Get,
+ StoreLocal: Modules.Namecache.Put,
+ LookupRemote: Modules.DHT.Get,
+ CancelRemote: Modules.DHT.Cancel,
}
}
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index 1e54c2d..fe3fa9e 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -25,36 +25,49 @@ import (
"github.com/bfix/gospel/logger"
)
-// Client
+// Client type: Use to perform client-side interactions with GNUnet services.
type Client struct {
- ch *transport.MsgChannel
+ ch *transport.MsgChannel // channel for message exchange
}
-// NewClient
+// NewClient creates a new client instance for the given channel endpoint.
func NewClient(endp string) (*Client, error) {
- //
+ // create a new channel to endpoint.
ch, err := transport.NewChannel(endp)
if err != nil {
return nil, err
}
+ // wrap into a message channel for the client.
return &Client{
ch: transport.NewMsgChannel(ch),
}, nil
}
-func (c *Client) SendRequest(req message.Message) error {
- return c.ch.Send(req)
+// SendRequest sends a give message to the service.
+func (c *Client) SendRequest(ctx *SessionContext, req message.Message) error {
+ return c.ch.Send(req, ctx.Signaller())
}
-func (c *Client) ReceiveResponse() (message.Message, error) {
- return c.ch.Receive()
+// ReceiveResponse waits for a response from the service; it can be interrupted
+// by sending "false" to the cmd channel.
+func (c *Client) ReceiveResponse(ctx *SessionContext) (message.Message, error)
{
+ return c.ch.Receive(ctx.Signaller())
}
+// Close a client; no further message exchange is possible.
func (c *Client) Close() error {
return c.ch.Close()
}
-func ServiceRequestResponse(caller, callee, endp string, req message.Message)
(message.Message, error) {
+// ServiceRequestResponse is a helper method for a one request - one response
+// secenarios of client/serice interactions.
+func ServiceRequestResponse(
+ ctx *SessionContext,
+ caller string,
+ callee string,
+ endp string,
+ req message.Message) (message.Message, error) {
+
// client-connect to the service
logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller,
callee)
cl, err := NewClient(endp)
@@ -63,13 +76,13 @@ func ServiceRequestResponse(caller, callee, endp string,
req message.Message) (m
}
// send request
logger.Printf(logger.DBG, "[%s] Sending request to %s service\n",
caller, callee)
- if err = cl.SendRequest(req); err != nil {
+ if err = cl.SendRequest(ctx, req); err != nil {
return nil, err
}
// wait for a single response, then close the connection
logger.Printf(logger.DBG, "[%s] Waiting for response from %s
service\n", caller, callee)
var resp message.Message
- if resp, err = cl.ReceiveResponse(); err != nil {
+ if resp, err = cl.ReceiveResponse(ctx); err != nil {
return nil, err
}
logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n",
caller, callee)
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
new file mode 100644
index 0000000..4896bd5
--- /dev/null
+++ b/src/gnunet/service/context.go
@@ -0,0 +1,68 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019, 2020 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package service
+
+import (
+ "sync"
+
+ "gnunet/util"
+
+ "github.com/bfix/gospel/concurrent"
+)
+
+// SessionContext is used to set a context for each client connection handled
+// by a service; the session is handled by the 'ServeClient' method of a
+// service implementation.
+type SessionContext struct {
+ Id int // session identifier
+ wg *sync.WaitGroup // wait group for the session
+ sig *concurrent.Signaller // signaller for the session
+}
+
+// NewSessionContext instantiates a new session context.
+func NewSessionContext() *SessionContext {
+ return &SessionContext{
+ Id: util.NextID(),
+ wg: new(sync.WaitGroup),
+ sig: concurrent.NewSignaller(),
+ }
+}
+
+// Cancel all go-routines associated with this context.
+func (ctx *SessionContext) Cancel() {
+ // send signal to terminate...
+ ctx.sig.Send(true)
+ // wait for session go-routines to finish
+ ctx.wg.Wait()
+}
+
+// Add a go-routine to the wait group.
+func (ctx *SessionContext) Add() {
+ ctx.wg.Add(1)
+}
+
+// Remove a go-routine from the wait group.
+func (ctx *SessionContext) Remove() {
+ ctx.wg.Done()
+}
+
+// Signaller returns the working instance for the context.
+func (ctx *SessionContext) Signaller() *concurrent.Signaller {
+ return ctx.sig
+}
diff --git a/src/gnunet/service/namecache/module.go
b/src/gnunet/service/dht/module.go
similarity index 71%
copy from src/gnunet/service/namecache/module.go
copy to src/gnunet/service/dht/module.go
index 6a10625..a54d0eb 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -16,29 +16,34 @@
//
// SPDX-License-Identifier: AGPL3.0-or-later
-package namecache
+package dht
import (
"gnunet/message"
+ "gnunet/service"
"gnunet/service/gns"
)
//======================================================================
-// "GNS name cache" implementation
+// "DHT" implementation
//======================================================================
//----------------------------------------------------------------------
-// Put and get GNS blocks into/from a cache (transient storage)
+// Put and get blocks into/from a DHT.
//----------------------------------------------------------------------
-// Namecache handles the transient storage of GNS blocks under the query key.
-type NamecacheModule struct {
+// DHT handles the permanent storage of blocks under the query key.
+type DHTModule struct {
}
-func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) {
+func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query)
(*message.GNSBlock, error) {
return nil, nil
}
-func (nc *NamecacheModule) Put(block *message.GNSBlock) error {
+func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query)
error {
+ return nil
+}
+
+func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock)
error {
return nil
}
diff --git a/src/gnunet/service/gns/dns.go b/src/gnunet/service/gns/dns.go
index 28e9813..7a9a30c 100644
--- a/src/gnunet/service/gns/dns.go
+++ b/src/gnunet/service/gns/dns.go
@@ -26,6 +26,7 @@ import (
"gnunet/enums"
"gnunet/message"
+ "gnunet/service"
"gnunet/util"
"github.com/bfix/gospel/crypto/ed25519"
@@ -202,10 +203,16 @@ func QueryDNS(id int, name string, server net.IP, kind
RRTypeList) *message.GNSR
// ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in
// parallel; the first result delivered by any of the servers is returned
// as the result list of matching resource records.
-func (gns *GNSModule) ResolveDNS(name string, servers []string, kind
RRTypeList, pkey *ed25519.PublicKey, depth int) (set *message.GNSRecordSet, err
error) {
- logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n",
name)
+func (gns *GNSModule) ResolveDNS(
+ ctx *service.SessionContext,
+ name string,
+ servers []string,
+ kind RRTypeList,
+ pkey *ed25519.PublicKey,
+ depth int) (set *message.GNSRecordSet, err error) {
// start DNS queries concurrently
+ logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n",
name)
res := make(chan *message.GNSRecordSet)
running := 0
for _, srv := range servers {
@@ -215,7 +222,7 @@ func (gns *GNSModule) ResolveDNS(name string, servers
[]string, kind RRTypeList,
if addr == nil {
// no, it is a name... try to resolve an IP address
from the name
query := NewRRTypeList(enums.GNS_TYPE_DNS_A,
enums.GNS_TYPE_DNS_AAAA)
- if set, err = gns.ResolveUnknown(srv, nil, pkey, query,
depth+1); err != nil {
+ if set, err = gns.ResolveUnknown(ctx, srv, nil, pkey,
query, depth+1); err != nil {
logger.Printf(logger.ERROR, "[dns] Can't
resolve NS server '%s': %s\n", srv, err.Error())
continue
}
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index d067633..43f7b7a 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -26,6 +26,8 @@ import (
"gnunet/crypto"
"gnunet/enums"
"gnunet/message"
+ "gnunet/service"
+ "gnunet/transport"
"gnunet/util"
"github.com/bfix/gospel/crypto/ed25519"
@@ -110,15 +112,22 @@ func NewQuery(pkey *ed25519.PublicKey, label string)
*Query {
// GNSModule handles the resolution of GNS names to RRs bundled in a block.
type GNSModule struct {
// Use function references for calls to methods in other modules:
- //
- LookupLocal func(query *Query) (*message.GNSBlock, error)
- StoreLocal func(block *message.GNSBlock) error
- LookupRemote func(query *Query) (*message.GNSBlock, error)
+ LookupLocal func(ctx *service.SessionContext, query *Query)
(*message.GNSBlock, error)
+ StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock)
error
+ LookupRemote func(ctx *service.SessionContext, query *Query)
(*message.GNSBlock, error)
+ CancelRemote func(ctx *service.SessionContext, query *Query) error
}
// Resolve a GNS name with multiple labels. If pkey is not nil, the name
// is interpreted as "relative to current zone".
-func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind
RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) {
+func (gns *GNSModule) Resolve(
+ ctx *service.SessionContext,
+ path string,
+ pkey *ed25519.PublicKey,
+ kind RRTypeList,
+ mode int,
+ depth int) (set *message.GNSRecordSet, err error) {
+
// check for recursion depth
if depth > config.Cfg.GNS.MaxDepth {
return nil, ErrGNSRecursionExceeded
@@ -130,14 +139,20 @@ func (gns *GNSModule) Resolve(path string, pkey
*ed25519.PublicKey, kind RRTypeL
// check for relative path
if pkey != nil {
//resolve relative path
- return gns.ResolveRelative(names, pkey, kind, mode, depth)
+ return gns.ResolveRelative(ctx, names, pkey, kind, mode, depth)
}
// resolve absolute path
- return gns.ResolveAbsolute(names, kind, mode, depth)
+ return gns.ResolveAbsolute(ctx, names, kind, mode, depth)
}
// Resolve a fully qualified GNS absolute name (with multiple labels).
-func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode
int, depth int) (set *message.GNSRecordSet, err error) {
+func (gns *GNSModule) ResolveAbsolute(
+ ctx *service.SessionContext,
+ labels []string,
+ kind RRTypeList,
+ mode int,
+ depth int) (set *message.GNSRecordSet, err error) {
+
// get the zone key for the TLD
pkey := gns.GetZoneKey(labels[0])
if pkey == nil {
@@ -146,12 +161,19 @@ func (gns *GNSModule) ResolveAbsolute(labels []string,
kind RRTypeList, mode int
return
}
// continue with resolution relative to a zone.
- return gns.ResolveRelative(labels[1:], pkey, kind, mode, depth)
+ return gns.ResolveRelative(ctx, labels[1:], pkey, kind, mode, depth)
}
// Resolve relative path (to a given zone) recursively by processing simple
// (PKEY,Label) lookups in sequence and handle intermediate GNS record types
-func (gns *GNSModule) ResolveRelative(labels []string, pkey
*ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set
*message.GNSRecordSet, err error) {
+func (gns *GNSModule) ResolveRelative(
+ ctx *service.SessionContext,
+ labels []string,
+ pkey *ed25519.PublicKey,
+ kind RRTypeList,
+ mode int,
+ depth int) (set *message.GNSRecordSet, err error) {
+
// Process all names in sequence
var (
records []*message.GNSResourceRecord // final resource records
from resolution
@@ -162,7 +184,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey
*ed25519.PublicKey,
// resolve next level
var block *message.GNSBlock
- if block, err = gns.Lookup(pkey, labels[0], mode); err != nil {
+ if block, err = gns.Lookup(ctx, pkey, labels[0], mode); err !=
nil {
// failed to resolve name
return
}
@@ -225,10 +247,23 @@ func (gns *GNSModule) ResolveRelative(labels []string,
pkey *ed25519.PublicKey,
lbls += "."
}
fqdn := lbls + inst.Query
- if set, err = gns.ResolveDNS(fqdn, inst.Servers, kind,
pkey, depth); err != nil {
+ if set, err = gns.ResolveDNS(ctx, fqdn, inst.Servers,
kind, pkey, depth); err != nil {
logger.Println(logger.ERROR, "[gns] GNS2DNS
resolution failed.")
return
}
+ // add synthetic LEHO record if we have results and are
at the
+ // end of the name (labels).
+ if len(set.Records) > 0 && len(labels) == 1 {
+ // add LEHO supplemental record: The TTL of the
new record is
+ // the longest-living record in the current set.
+ expires := util.AbsoluteTimeNow()
+ for _, rec := range set.Records {
+ if rec.Expires.Compare(expires) > 0 {
+ expires = rec.Expires
+ }
+ }
+ set.Records = append(set.Records,
gns.newLEHORecord(inst.Query, expires))
+ }
// we are done with resolution; pass on records to
caller
records = set.Records
break
@@ -250,7 +285,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey
*ed25519.PublicKey,
break
}
logger.Println(logger.DBG, "[gns] CNAME resolution
required.")
- if set, err = gns.ResolveUnknown(inst.name, labels,
pkey, kind, depth+1); err != nil {
+ if set, err = gns.ResolveUnknown(ctx, inst.name,
labels, pkey, kind, depth+1); err != nil {
logger.Println(logger.ERROR, "[gns] CNAME
resolution failed.")
return
}
@@ -300,7 +335,14 @@ func (gns *GNSModule) ResolveRelative(labels []string,
pkey *ed25519.PublicKey,
// relative to the zone PKEY. If the name is an absolute GNS name (ending in
// a PKEY TLD), it is also resolved with GNS. All other names are resolved
// via DNS queries.
-func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey
*ed25519.PublicKey, kind RRTypeList, depth int) (set *message.GNSRecordSet, err
error) {
+func (gns *GNSModule) ResolveUnknown(
+ ctx *service.SessionContext,
+ name string,
+ labels []string,
+ pkey *ed25519.PublicKey,
+ kind RRTypeList,
+ depth int) (set *message.GNSRecordSet, err error) {
+
// relative GNS-based server name?
if strings.HasSuffix(name, ".+") {
// resolve server name relative to current zone
@@ -308,14 +350,14 @@ func (gns *GNSModule) ResolveUnknown(name string, labels
[]string, pkey *ed25519
for _, label := range util.ReverseStringList(labels) {
name += "." + label
}
- if set, err = gns.Resolve(name, pkey, kind,
enums.GNS_LO_DEFAULT, depth+1); err != nil {
+ if set, err = gns.Resolve(ctx, name, pkey, kind,
enums.GNS_LO_DEFAULT, depth+1); err != nil {
return
}
} else {
// check for absolute GNS name (with PKEY as TLD)
if zk := gns.GetZoneKey(name); zk != nil {
// resolve absolute GNS name (name ends in a PKEY)
- if set, err = gns.Resolve(util.StripPathRight(name),
zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil {
+ if set, err = gns.Resolve(ctx,
util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil
{
return
}
} else {
@@ -342,13 +384,17 @@ func (gns *GNSModule) GetZoneKey(path string)
*ed25519.PublicKey {
}
// Lookup name in GNS.
-func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int)
(block *message.GNSBlock, err error) {
+func (gns *GNSModule) Lookup(
+ ctx *service.SessionContext,
+ pkey *ed25519.PublicKey,
+ label string,
+ mode int) (block *message.GNSBlock, err error) {
// create query (lookup key)
query := NewQuery(pkey, label)
// try local lookup first
- if block, err = gns.LookupLocal(query); err != nil {
+ if block, err = gns.LookupLocal(ctx, query); err != nil {
logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n",
err.Error())
block = nil
return
@@ -356,9 +402,17 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey,
label string, mode int) (b
if block == nil {
if mode == enums.GNS_LO_DEFAULT {
// get the block from a remote lookup
- if block, err = gns.LookupRemote(query); err != nil ||
block == nil {
+ if block, err = gns.LookupRemote(ctx, query); err !=
nil || block == nil {
if err != nil {
- logger.Printf(logger.ERROR, "[gns]
remote Lookup: %s\n", err.Error())
+ // check for aborted remote lookup: we
need to cancel the query
+ if err ==
transport.ErrChannelInterrupted {
+ logger.Println(logger.WARN,
"[gns] remote Lookup aborted -- cleaning up.")
+ if err = gns.CancelRemote(ctx,
query); err != nil {
+
logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n",
err.Error())
+ }
+ } else {
+ logger.Printf(logger.ERROR,
"[gns] remote Lookup failed: %s\n", err.Error())
+ }
block = nil
} else {
logger.Println(logger.DBG, "[gns]
remote Lookup: no block found")
@@ -367,8 +421,21 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey,
label string, mode int) (b
return
}
// store RRs from remote locally.
- gns.StoreLocal(block)
+ gns.StoreLocal(ctx, block)
}
}
return
}
+
+// newLEHORecord creates a new supplemental GNS record of type LEHO.
+func (gns *GNSModule) newLEHORecord(name string, expires util.AbsoluteTime)
*message.GNSResourceRecord {
+ rr := new(message.GNSResourceRecord)
+ rr.Expires = expires
+ rr.Flags = uint32(enums.GNS_FLAG_SUPPL)
+ rr.Type = uint32(enums.GNS_TYPE_LEHO)
+ rr.Size = uint32(len(name) + 1)
+ rr.Data = make([]byte, rr.Size)
+ copy(rr.Data, []byte(name))
+ rr.Data[len(name)] = 0
+ return rr
+}
diff --git a/src/gnunet/service/gns/service.go
b/src/gnunet/service/gns/service.go
index 1a62e77..2526ae8 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -22,7 +22,6 @@ import (
"encoding/hex"
"fmt"
"io"
- "sync"
"gnunet/config"
"gnunet/crypto"
@@ -60,6 +59,7 @@ func NewGNSService() service.Service {
inst.LookupLocal = inst.LookupNamecache
inst.StoreLocal = inst.StoreNamecache
inst.LookupRemote = inst.LookupDHT
+ inst.CancelRemote = inst.CancelDHT
return inst
}
@@ -74,15 +74,18 @@ func (s *GNSService) Stop() error {
}
// Serve a client channel.
-func (s *GNSService) ServeClient(wg *sync.WaitGroup, mc *transport.MsgChannel)
{
- defer wg.Done()
+func (s *GNSService) ServeClient(ctx *service.SessionContext, mc
*transport.MsgChannel) {
+
loop:
for {
// receive next message from client
- msg, err := mc.Receive()
+ logger.Printf(logger.DBG, "[gns] Waiting for message in session
'%d'...\n", ctx.Id)
+ msg, err := mc.Receive(ctx.Signaller())
if err != nil {
if err == io.EOF {
logger.Println(logger.INFO, "[gns] Client
channel closed.")
+ } else if err == transport.ErrChannelInterrupted {
+ logger.Println(logger.INFO, "[gns] Service
operation interrupted.")
} else {
logger.Printf(logger.ERROR, "[gns]
Message-receive failed: %s\n", err.Error())
}
@@ -102,23 +105,28 @@ loop:
resp = respX
// perform lookup on block (locally and remote)
- wg.Add(1)
go func() {
+ ctx.Add()
defer func() {
// send response
- if err := mc.Send(resp); err != nil {
- logger.Printf(logger.ERROR,
"[gns] Failed to send response: %s\n", err.Error())
+ if resp != nil {
+ if err := mc.Send(resp,
ctx.Signaller()); err != nil {
+
logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error())
+ }
}
// go-routine finished
- wg.Done()
+ ctx.Remove()
}()
pkey := ed25519.NewPublicKeyFromBytes(m.Zone)
label := m.GetName()
kind := NewRRTypeList(int(m.Type))
- recset, err := s.Resolve(label, pkey, kind,
int(m.Options), 0)
+ recset, err := s.Resolve(ctx, label, pkey,
kind, int(m.Options), 0)
if err != nil {
logger.Printf(logger.ERROR, "[gns]
Failed to lookup block: %s\n", err.Error())
+ if err ==
transport.ErrChannelInterrupted {
+ resp = nil
+ }
return
}
// handle records
@@ -151,12 +159,16 @@ loop:
break loop
}
}
+ // cancel all tasks running for this session/connection
+ logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n",
ctx.Id)
+ ctx.Cancel()
+
// close client connection
mc.Close()
}
// LookupNamecache
-func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock,
err error) {
+func (s *GNSService) LookupNamecache(ctx *service.SessionContext, query
*Query) (block *message.GNSBlock, err error) {
logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n",
hex.EncodeToString(query.Key.Bits))
// assemble Namecache request
@@ -166,7 +178,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block
*message.GNSBlock, err
// get response from Namecache service
var resp message.Message
- if resp, err = service.ServiceRequestResponse("gns", "Namecache",
config.Cfg.Namecache.Endpoint, req); err != nil {
+ if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache",
config.Cfg.Namecache.Endpoint, req); err != nil {
return
}
@@ -219,7 +231,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block
*message.GNSBlock, err
}
// StoreNamecache
-func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) {
+func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block
*message.GNSBlock) (err error) {
logger.Println(logger.DBG, "[gns] StoreNamecache()...")
// assemble Namecache request
@@ -228,7 +240,7 @@ func (s *GNSService) StoreNamecache(block
*message.GNSBlock) (err error) {
// get response from Namecache service
var resp message.Message
- if resp, err = service.ServiceRequestResponse("gns", "Namecache",
config.Cfg.Namecache.Endpoint, req); err != nil {
+ if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache",
config.Cfg.Namecache.Endpoint, req); err != nil {
return
}
@@ -255,7 +267,7 @@ func (s *GNSService) StoreNamecache(block
*message.GNSBlock) (err error) {
}
// LookupDHT
-func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err
error) {
+func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query)
(block *message.GNSBlock, err error) {
logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n",
hex.EncodeToString(query.Key.Bits))
// assemble DHT request
@@ -268,7 +280,7 @@ func (s *GNSService) LookupDHT(query *Query) (block
*message.GNSBlock, err error
// get response from DHT service
var resp message.Message
- if resp, err = service.ServiceRequestResponse("gns", "DHT",
config.Cfg.DHT.Endpoint, req); err != nil {
+ if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT",
config.Cfg.DHT.Endpoint, req); err != nil {
return
}
@@ -313,9 +325,36 @@ func (s *GNSService) LookupDHT(query *Query) (block
*message.GNSBlock, err error
// we got a result from DHT that was not in the namecache,
// so store it there now.
- if err = s.StoreNamecache(block); err != nil {
+ if err = s.StoreNamecache(ctx, block); err != nil {
logger.Printf(logger.ERROR, "[gns] can't store block in
Namecache: %s\n", err.Error())
}
}
return
}
+
+// CancelDHT
+func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err
error) {
+ logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n",
hex.EncodeToString(query.Key.Bits))
+
+ // assemble DHT request
+ req := message.NewDHTClientGetStopMsg(query.Key)
+ req.Id = uint64(util.NextID())
+
+ // get response from DHT service
+ var resp message.Message
+ if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT",
config.Cfg.DHT.Endpoint, req); err != nil {
+ return
+ }
+
+ // handle message depending on its type
+ logger.Println(logger.DBG, "[gns] Handling response from DHT service")
+ switch m := resp.(type) {
+ case *message.DHTClientResultMsg:
+ // check for matching IDs
+ if m.Id != req.Id {
+ logger.Println(logger.ERROR, "[gns] Got response for
unknown ID")
+ break
+ }
+ }
+ return
+}
diff --git a/src/gnunet/service/namecache/module.go
b/src/gnunet/service/namecache/module.go
index 6a10625..a205444 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -20,6 +20,7 @@ package namecache
import (
"gnunet/message"
+ "gnunet/service"
"gnunet/service/gns"
)
@@ -35,10 +36,10 @@ import (
type NamecacheModule struct {
}
-func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) {
+func (nc *NamecacheModule) Get(ctx *service.SessionContext, query *gns.Query)
(*message.GNSBlock, error) {
return nil, nil
}
-func (nc *NamecacheModule) Put(block *message.GNSBlock) error {
+func (nc *NamecacheModule) Put(ctx *service.SessionContext, block
*message.GNSBlock) error {
return nil
}
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go
index d484a95..1017e0b 100644
--- a/src/gnunet/service/service.go
+++ b/src/gnunet/service/service.go
@@ -33,19 +33,21 @@ import (
// Channel semantics in the specification string.
type Service interface {
Start(spec string) error
- ServeClient(wg *sync.WaitGroup, ch *transport.MsgChannel)
+ ServeClient(ctx *SessionContext, ch *transport.MsgChannel)
Stop() error
}
// ServiceImpl is an implementation of generic service functionality.
type ServiceImpl struct {
- impl Service
- hdlr chan transport.Channel
- ctrl chan bool
- srvc transport.ChannelServer
- wg *sync.WaitGroup
- name string
- running bool
+ impl Service // Specific service implementation
+ hdlr chan transport.Channel // Channel from listener
+ ctrl chan bool // Control channel
+ drop chan int // Channel to drop a session from
pending list
+ srvc transport.ChannelServer // multi-user service
+ wg *sync.WaitGroup // wait group for go routine
synchronization
+ name string // service name
+ running bool // service currently running?
+ pending map[int]*SessionContext // list of pending sessions
}
// NewServiceImpl instantiates a new ServiceImpl object.
@@ -54,10 +56,12 @@ func NewServiceImpl(name string, srv Service) *ServiceImpl {
impl: srv,
hdlr: make(chan transport.Channel),
ctrl: make(chan bool),
+ drop: make(chan int),
srvc: nil,
wg: new(sync.WaitGroup),
name: name,
running: false,
+ pending: make(map[int]*SessionContext),
}
}
@@ -83,6 +87,8 @@ func (si *ServiceImpl) Start(spec string) (err error) {
loop:
for si.running {
select {
+
+ // handle incoming connections
case in := <-si.hdlr:
if in == nil {
logger.Printf(logger.INFO, "[%s]
Listener terminated.\n", si.name)
@@ -90,14 +96,38 @@ func (si *ServiceImpl) Start(spec string) (err error) {
}
switch ch := in.(type) {
case transport.Channel:
- logger.Printf(logger.INFO, "[%s] Client
connected.\n", si.name)
- si.wg.Add(1)
- go si.impl.ServeClient(si.wg,
transport.NewMsgChannel(ch))
+ // run a new session with context
+ ctx := NewSessionContext()
+ sessId := ctx.Id
+ si.pending[sessId] = ctx
+ logger.Printf(logger.INFO, "[%s]
Session '%d' started.\n", si.name, sessId)
+
+ go func() {
+ // serve client on the message
channel
+ si.impl.ServeClient(ctx,
transport.NewMsgChannel(ch))
+ // session is done now.
+ logger.Printf(logger.INFO,
"[%s] Session with client '%d' ended.\n", si.name, sessId)
+ si.drop <- sessId
+ }()
}
+
+ // handle session removal
+ case sessId := <-si.drop:
+ delete(si.pending, sessId)
+
+ // handle cancelation signal on listener.
case <-si.ctrl:
break loop
}
}
+
+ // terminate pending sessions
+ for _, ctx := range si.pending {
+ logger.Printf(logger.DBG, "[%s] Session '%d'
closing...\n", si.name, ctx.Id)
+ ctx.Cancel()
+ }
+
+ // close-down service
logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name)
si.srvc.Close()
si.running = false
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 092ab8a..4ba49ac 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -28,6 +28,7 @@ import (
"gnunet/message"
"gnunet/util"
+ "github.com/bfix/gospel/concurrent"
"github.com/bfix/gospel/data"
"github.com/bfix/gospel/logger"
)
@@ -36,6 +37,8 @@ import (
var (
ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
ErrChannelNotOpened = fmt.Errorf("Channel not opened")
+ ErrChannelInterrupted = fmt.Errorf("Channel interrupted")
+ ErrChannelClosed = fmt.Errorf("Channel closed")
)
////////////////////////////////////////////////////////////////////////
@@ -49,10 +52,11 @@ var (
// "tcp+1.2.3.4:5" -- for TCP channels
// "udp+1.2.3.4:5" -- for UDP channels
type Channel interface {
- Open(spec string) error
- Close() error
- Read([]byte) (int, error)
- Write([]byte) (int, error)
+ Open(spec string) error // open channel (for
read/write)
+ Close() error // close open channel
+ IsOpen() bool // check if channel
is open
+ Read([]byte, *concurrent.Signaller) (int, error) // read from channel
+ Write([]byte, *concurrent.Signaller) (int, error) // write to channel
}
// ChannelFactory instantiates specific Channel imülementations.
@@ -140,7 +144,12 @@ func (c *MsgChannel) Close() error {
}
// Send a GNUnet message over a channel.
-func (c *MsgChannel) Send(msg message.Message) error {
+func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller)
error {
+
+ // check for closed channel
+ if !c.ch.IsOpen() {
+ return ErrChannelClosed
+ }
// convert message to binary data
data, err := data.Marshal(msg)
@@ -160,7 +169,7 @@ func (c *MsgChannel) Send(msg message.Message) error {
}
// send packet
- n, err := c.ch.Write(data)
+ n, err := c.ch.Write(data, sig)
if err != nil {
return err
}
@@ -171,9 +180,15 @@ func (c *MsgChannel) Send(msg message.Message) error {
}
// Receive GNUnet messages over a plain Channel.
-func (c *MsgChannel) Receive() (message.Message, error) {
+func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message,
error) {
+ // check for closed channel
+ if !c.ch.IsOpen() {
+ return nil, ErrChannelClosed
+ }
+
+ // get bytes from channel
get := func(pos, count int) error {
- n, err := c.ch.Read(c.buf[pos : pos+count])
+ n, err := c.ch.Read(c.buf[pos:pos+count], sig)
if err != nil {
return err
}
@@ -182,6 +197,7 @@ func (c *MsgChannel) Receive() (message.Message, error) {
}
return nil
}
+
if err := get(0, 4); err != nil {
return nil, err
}
diff --git a/src/gnunet/transport/channel_netw.go
b/src/gnunet/transport/channel_netw.go
index c56c089..4f0ba4a 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -24,9 +24,30 @@ import (
"strconv"
"strings"
+ "github.com/bfix/gospel/concurrent"
"github.com/bfix/gospel/logger"
)
+// ChannelResult for read/write operations on channels.
+type ChannelResult struct {
+ count int // number of bytes read/written
+ err error // error (or nil)
+}
+
+// NewChannelResult instanciates a new object with given attributes.
+func NewChannelResult(n int, err error) *ChannelResult {
+ return &ChannelResult{
+ count: n,
+ err: err,
+ }
+}
+
+// Values() returns the attributes of a result instance (for passing up the
+// call stack).
+func (cr *ChannelResult) Values() (int, error) {
+ return cr.count, cr.err
+}
+
////////////////////////////////////////////////////////////////////////
// Generic network-based Channel
@@ -64,27 +85,86 @@ func (c *NetworkChannel) Open(spec string) (err error) {
// Close a network channel
func (c *NetworkChannel) Close() error {
if c.conn != nil {
- return c.conn.Close()
+ rc := c.conn.Close()
+ c.conn = nil
+ return rc
}
return ErrChannelNotOpened
}
+// IsOpen returns true if the channel is opened
+func (c *NetworkChannel) IsOpen() bool {
+ return c.conn != nil
+}
+
// Read bytes from a network channel into buffer: Returns the number of read
// bytes and an error code. Only works on open channels ;)
-func (c *NetworkChannel) Read(buf []byte) (int, error) {
+// The read can be aborted by sending 'true' on the cmd interface; the
+// channel is closed after such interruption.
+func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int,
error) {
+ // check if the channel is open
if c.conn == nil {
return 0, ErrChannelNotOpened
}
- return c.conn.Read(buf)
+ // perform operation in go-routine
+ result := make(chan *ChannelResult)
+ go func() {
+ result <- NewChannelResult(c.conn.Read(buf))
+ }()
+
+ listener := sig.Listen()
+ defer sig.Drop(listener)
+ for {
+ select {
+ // handle terminate command
+ case x := <-listener:
+ switch val := x.(type) {
+ case bool:
+ if val {
+ c.conn.Close()
+ c.conn = nil
+ return 0, ErrChannelInterrupted
+ }
+ }
+ // handle result of read operation
+ case res := <-result:
+ return res.Values()
+ }
+ }
}
// Write buffer to a network channel: Returns the number of written bytes and
-// an error code.
-func (c *NetworkChannel) Write(buf []byte) (int, error) {
+// an error code. The write operation can be aborted by sending 'true' on the
+// command channel; the network channel is closed after such interrupt.
+func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int,
error) {
+ // check if we have an open channel to write to.
if c.conn == nil {
return 0, ErrChannelNotOpened
}
- return c.conn.Write(buf)
+ // perform operation in go-routine
+ result := make(chan *ChannelResult)
+ go func() {
+ result <- NewChannelResult(c.conn.Write(buf))
+ }()
+
+ listener := sig.Listen()
+ defer sig.Drop(listener)
+ for {
+ select {
+ // handle terminate command
+ case x := <-listener:
+ switch val := x.(type) {
+ case bool:
+ if val {
+ c.conn.Close()
+ return 0, ErrChannelInterrupted
+ }
+ }
+ // handle result of read operation
+ case res := <-result:
+ return res.Values()
+ }
+ }
}
////////////////////////////////////////////////////////////////////////
diff --git a/src/gnunet/transport/connection.go
b/src/gnunet/transport/connection.go
index e2cfae2..03549fc 100644
--- a/src/gnunet/transport/connection.go
+++ b/src/gnunet/transport/connection.go
@@ -21,6 +21,8 @@ package transport
import (
"gnunet/core"
"gnunet/message"
+
+ "github.com/bfix/gospel/concurrent"
)
// Connection for communicating peers
@@ -67,11 +69,11 @@ func (c *Connection) Close() error {
}
// Send a message on the connection
-func (c *Connection) Send(msg message.Message) error {
- return c.ch.Send(msg)
+func (c *Connection) Send(msg message.Message, sig *concurrent.Signaller)
error {
+ return c.ch.Send(msg, sig)
}
// Receive a message on the connection
-func (c *Connection) Receive() (message.Message, error) {
- return c.ch.Receive()
+func (c *Connection) Receive(sig *concurrent.Signaller) (message.Message,
error) {
+ return c.ch.Receive(sig)
}
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index afad974..80f5c84 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -22,6 +22,7 @@ import (
"strings"
)
+// CounterMap
type CounterMap map[interface{}]int
func (cm CounterMap) Add(i interface{}) int {
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go
index 3e09791..e1e0e30 100644
--- a/src/gnunet/util/time.go
+++ b/src/gnunet/util/time.go
@@ -78,6 +78,25 @@ func (t AbsoluteTime) Expired() bool {
return t.Val < uint64(time.Now().Unix())
}
+// Compare two times (-1 = (t < t2), 0 = (t == t2), 1 = (t > t2)
+func (t AbsoluteTime) Compare(t2 AbsoluteTime) int {
+ if t.Val == math.MaxUint64 {
+ if t2.Val == math.MaxUint64 {
+ return 0
+ }
+ return 1
+ }
+ if t2.Val == math.MaxUint64 {
+ return -1
+ }
+ if t.Val < t2.Val {
+ return -1
+ } else if t.Val == t2.Val {
+ return 0
+ }
+ return 1
+}
+
//----------------------------------------------------------------------
// Relative time
//----------------------------------------------------------------------
--
To stop receiving notification emails like this one, please contact
address@hidden.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: Handle DHT lookup aborts.,
gnunet <=