openidec

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

net.go (8406B)


      1 // Network operations: fetch, post/get message from point
      2 // Check node extensions.
      3 
      4 package ii
      5 
      6 import (
      7 	"bufio"
      8 	"encoding/base64"
      9 	"errors"
     10 	"fmt"
     11 	"io"
     12 	"io/ioutil"
     13 	"net/http"
     14 
     15 	//	"net/smtp"
     16 	"net/url"
     17 	"strings"
     18 	"sync"
     19 )
     20 
     21 // Node object. Use Connect to create it.
     22 // Host: url node
     23 // Features: extensions map
     24 // Force: force sync even last message is not new
     25 type Node struct {
     26 	Host     string
     27 	Features map[string]bool
     28 	Force    bool
     29 }
     30 
     31 // utility function to make get request and call fn
     32 // for every line. Stops on EOF or fn return false.
     33 func http_req_lines(url string, fn func(string) bool) error {
     34 	resp, err := http.Get(url)
     35 	if err != nil {
     36 		return err
     37 	}
     38 	defer resp.Body.Close()
     39 	reader := bufio.NewReader(resp.Body)
     40 	for {
     41 		line, err := reader.ReadString('\n')
     42 		if err != nil && err != io.EOF {
     43 			return err
     44 		}
     45 		line = strings.TrimSuffix(line, "\n")
     46 		if err == io.EOF {
     47 			if line != "" { /* node do not send final \n */
     48 				fn(line)
     49 			}
     50 			break
     51 		}
     52 		if !fn(line) {
     53 			break
     54 		}
     55 	}
     56 	return nil
     57 }
     58 
     59 // short variant of http_get_lines. Read one line and
     60 // interpret it as message id. Return it.
     61 func http_get_id(url string) (string, error) {
     62 	res := ""
     63 	if err := http_req_lines(url, func(line string) bool {
     64 		if strings.Contains(line, ".") {
     65 			return true
     66 		}
     67 		res += line
     68 		return true
     69 	}); err != nil {
     70 		return "", err
     71 	}
     72 	return res, nil
     73 }
     74 
     75 // Fetcher internal goroutine.
     76 // DB: db to write
     77 // Echo: echo to fetch
     78 // wait: sync for Fetch master to detect finishing of work
     79 // cond: used for wake-up new goroutines
     80 // Can work in different modes.
     81 // If limit > 0, just fetch last [limit] messages (-limit:limit slice)
     82 // if limit < 0, use adaptive mode, probe (-(2*n)* limit:1) messages
     83 // untill find old message.
     84 // if node does not support u/e slices, than full sync performed
     85 // if node connection is not in Force mode, do not perform sync if not needed
     86 func (n *Node) Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond *sync.Cond) {
     87 	defer func() {
     88 		cond.L.Lock()
     89 		cond.Broadcast()
     90 		cond.L.Unlock()
     91 	}()
     92 	defer wait.Done()
     93 	if n.IsFeature("u/e") { /* fast path */
     94 		if !n.Force {
     95 			id, err := http_get_id(n.Host + "/u/e/" + Echo + "/-1:1")
     96 			if !IsMsgId(id) {
     97 				Info.Printf("%s: no valid MsgId", Echo)
     98 				return
     99 			}
    100 			if err == nil && db.Exists(id) != nil { /* no sync needed */
    101 				Info.Printf("%s: no sync needed", Echo)
    102 				return
    103 			}
    104 		}
    105 		if limit < 0 {
    106 			limit = -limit
    107 			try := 0
    108 			for { // adaptive
    109 				if try > 16 { /* fallback to old scheme */
    110 					limit = 0
    111 					break
    112 				}
    113 				id, err := http_get_id(fmt.Sprintf("%s/u/e/%s/%d:1",
    114 					n.Host, Echo, -limit))
    115 				if err != nil { /* fallback to old scheme */
    116 					limit = 0
    117 					break
    118 				}
    119 				if db.Exists(id) != nil {
    120 					break
    121 				}
    122 				try++
    123 				limit *= 2
    124 			}
    125 		}
    126 	} else {
    127 		limit = 0
    128 	}
    129 	req := fmt.Sprintf("%s/u/e/%s", n.Host, Echo)
    130 	if limit > 0 {
    131 		req = fmt.Sprintf("%s/%d:%d", req, -limit, limit)
    132 	}
    133 	Info.Printf("Get %s", req)
    134 	var res []string
    135 	if err := http_req_lines(req, func(line string) bool {
    136 		if strings.Contains(line, ".") {
    137 			return true
    138 		}
    139 		if db.Exists(line) == nil {
    140 			res = append(res, line)
    141 		}
    142 		return true
    143 	}); err != nil {
    144 		return
    145 	}
    146 
    147 	// FIXME: handle this error
    148 	_ = n.Store(db, res)
    149 }
    150 
    151 // Do not run more then MaxConnections goroutines in the same time
    152 var MaxConnections = 6
    153 
    154 // Send point message to node using GET method of /u/point scheme.
    155 // pauth: secret string. msg - raw message in plaintext
    156 // returns error
    157 func (n *Node) Send(pauth string, msg string) error {
    158 	msg = base64.URLEncoding.EncodeToString([]byte(msg))
    159 	//	msg = url.QueryEscape(msg)
    160 	req := fmt.Sprintf("%s/u/point/%s/%s", n.Host, pauth, msg)
    161 	resp, err := http.Get(req)
    162 	Trace.Printf("Get %s", req)
    163 	if err != nil {
    164 		return err
    165 	}
    166 	buf, err := ioutil.ReadAll(resp.Body)
    167 	if strings.HasPrefix(string(buf), "msg ok") {
    168 		Trace.Printf("Server responced msg ok")
    169 		return nil
    170 	} else if len(buf) > 0 {
    171 		err = errors.New(string(buf))
    172 	}
    173 	if err == nil {
    174 		err = errors.New("Server did not response with ok")
    175 	}
    176 	return err
    177 }
    178 
    179 // Send point message to node using POST method of /u/point scheme.
    180 // pauth: secret string. msg - raw message in plaintext
    181 // returns error
    182 func (n *Node) Post(pauth string, msg string) error {
    183 	msg = base64.StdEncoding.EncodeToString([]byte(msg))
    184 	// msg = url.QueryEscape(msg)
    185 	postData := url.Values{
    186 		"pauth": {pauth},
    187 		"tmsg":  {msg},
    188 	}
    189 	resp, err := http.PostForm(n.Host+"/u/point", postData)
    190 	Trace.Printf("Post %s/u/point", n.Host)
    191 	if err != nil {
    192 		return err
    193 	}
    194 	buf, err := ioutil.ReadAll(resp.Body)
    195 	if strings.HasPrefix(string(buf), "msg ok") {
    196 		Trace.Printf("Server responced msg ok")
    197 		return nil
    198 	} else if len(buf) > 0 {
    199 		err = errors.New(string(buf))
    200 	}
    201 	if err == nil {
    202 		err = errors.New("Server did not response with ok")
    203 	}
    204 	return err
    205 }
    206 
    207 // Return list.txt in []string if node supports it.
    208 // WARNING: Only echo names are returned! Each string is just echoarea.
    209 // Used for fetch all mode.
    210 func (n *Node) List() ([]string, error) {
    211 	var list []string
    212 	if !n.IsFeature("list.txt") {
    213 		return list, nil
    214 	}
    215 	if err := http_req_lines(n.Host+"/list.txt", func(line string) bool {
    216 		list = append(list, strings.Split(line, ":")[0])
    217 		return true
    218 	}); err != nil {
    219 		return list, err
    220 	}
    221 	return list, nil
    222 }
    223 
    224 // Fetch and write selected messages in db.
    225 // ids: selected message ids.
    226 // db: Database.
    227 // This function make /u/m request, decodes bundles, checks,
    228 // and write them to db (line by line).
    229 func (n *Node) Store(db *DB, ids []string) error {
    230 	req := ""
    231 	var nreq int
    232 	count := len(ids)
    233 	Trace.Printf("Get and store messages")
    234 	for i := 0; i < count; i++ {
    235 		req = req + "/" + string(ids[i])
    236 		nreq++
    237 		if nreq < 8 && i < count-1 {
    238 			continue
    239 		}
    240 		if err := http_req_lines(n.Host+"/u/m"+req, func(b string) bool {
    241 			m, e := DecodeBundle(b)
    242 			if e != nil {
    243 				Error.Printf("Can not decode message %s (%s)\n", b, e)
    244 				return true
    245 			}
    246 			if e := db.Store(m); e != nil {
    247 				Error.Printf("Can not write message %s (%s)\n", m.MsgId, e)
    248 			}
    249 			return true
    250 		}); err != nil {
    251 			return err
    252 		}
    253 		nreq = 0
    254 		req = ""
    255 	}
    256 	return nil
    257 }
    258 
    259 // This is Fetcher master function. It makes fetch from node
    260 // and run goroutines in parralel mode (one goroutine per echo).
    261 // Echolist: list with echoarea names. If list is empty,
    262 // function will try to get list via list.txt request.
    263 // limit: see Fetcher function. Describe fetching mode/limit.
    264 func (n *Node) Fetch(db *DB, Echolist []string, limit int) error {
    265 	if len(Echolist) == 0 {
    266 		Echolist, _ = n.List()
    267 	}
    268 	if Echolist == nil {
    269 		return nil
    270 	}
    271 	var wait sync.WaitGroup
    272 	cond := sync.NewCond(&sync.Mutex{})
    273 	num := 0
    274 	Info.Printf("Start fetcher(s) for %s", n.Host)
    275 	for _, v := range Echolist {
    276 		if !IsEcho(v) {
    277 			if strings.Trim(v, " ") != "" {
    278 				Trace.Printf("Skip echo: %s", v)
    279 			}
    280 			continue
    281 		}
    282 		wait.Add(1)
    283 		num += 1
    284 		if num >= MaxConnections { /* add per one */
    285 			cond.L.Lock()
    286 			Trace.Printf("Start fetcher for: %s", v)
    287 			go n.Fetcher(db, v, limit, &wait, cond)
    288 			Trace.Printf("Waiting free thread")
    289 			cond.Wait()
    290 			cond.L.Unlock()
    291 		} else {
    292 			Trace.Printf("Start fetcher for: %s", v)
    293 			go n.Fetcher(db, v, limit, &wait, cond)
    294 		}
    295 	}
    296 	Trace.Printf("Waiting thread(s)")
    297 	wait.Wait()
    298 	return nil
    299 }
    300 
    301 // Check if node has feature?
    302 // Features are gets while Connect call.
    303 func (n *Node) IsFeature(f string) bool {
    304 	_, ok := n.Features[f]
    305 	return ok
    306 }
    307 
    308 // Connect to node, get features and returns
    309 // pointer to Node object.
    310 func Connect(addr string) (*Node, error) {
    311 	var n Node
    312 	n.Host = strings.TrimSuffix(addr, "/")
    313 	n.Features = make(map[string]bool)
    314 	if err := http_req_lines(n.Host+"/x/features", func(line string) bool {
    315 		n.Features[line] = true
    316 		Trace.Printf("%s supports %s", n.Host, line)
    317 		return true
    318 	}); err != nil {
    319 		return nil, err
    320 	}
    321 	return &n, nil
    322 }
    323 
    324 /*
    325 // commented out routine to send e-mails ;)
    326 func SendMail(email string, login string, passwd string, server string) error {
    327 	aserv := strings.Split(server, ":")[0]
    328 	auth := smtp.PlainAuth("", login, passwd, aserv)
    329 	msg := "Hello!"
    330 	msg = "From: noreply@ii-go\n" +
    331 		"To: " + email + "\n" +
    332 		"Subject: Hello there\n\n" +
    333 		msg
    334 	err := smtp.SendMail(server, auth, "noreply@ii-go",[]string{email}, []byte(msg))
    335 	if err != nil {
    336 		Error.Printf("Can't send message to: %s", email)
    337 		return err
    338 	}
    339 	Info.Printf("Sent message to: %s", email)
    340 	return nil
    341 }
    342 */