net.go (8406B)
1 // Network operations: fetch, post/get message from point 2 // Check node extensions. 3 4 package ii 5 6 import ( 7 "bufio" 8 "encoding/base64" 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "net/http" 14 15 // "net/smtp" 16 "net/url" 17 "strings" 18 "sync" 19 ) 20 21 // Node object. Use Connect to create it. 22 // Host: url node 23 // Features: extensions map 24 // Force: force sync even last message is not new 25 type Node struct { 26 Host string 27 Features map[string]bool 28 Force bool 29 } 30 31 // utility function to make get request and call fn 32 // for every line. Stops on EOF or fn return false. 33 func http_req_lines(url string, fn func(string) bool) error { 34 resp, err := http.Get(url) 35 if err != nil { 36 return err 37 } 38 defer resp.Body.Close() 39 reader := bufio.NewReader(resp.Body) 40 for { 41 line, err := reader.ReadString('\n') 42 if err != nil && err != io.EOF { 43 return err 44 } 45 line = strings.TrimSuffix(line, "\n") 46 if err == io.EOF { 47 if line != "" { /* node do not send final \n */ 48 fn(line) 49 } 50 break 51 } 52 if !fn(line) { 53 break 54 } 55 } 56 return nil 57 } 58 59 // short variant of http_get_lines. Read one line and 60 // interpret it as message id. Return it. 61 func http_get_id(url string) (string, error) { 62 res := "" 63 if err := http_req_lines(url, func(line string) bool { 64 if strings.Contains(line, ".") { 65 return true 66 } 67 res += line 68 return true 69 }); err != nil { 70 return "", err 71 } 72 return res, nil 73 } 74 75 // Fetcher internal goroutine. 76 // DB: db to write 77 // Echo: echo to fetch 78 // wait: sync for Fetch master to detect finishing of work 79 // cond: used for wake-up new goroutines 80 // Can work in different modes. 81 // If limit > 0, just fetch last [limit] messages (-limit:limit slice) 82 // if limit < 0, use adaptive mode, probe (-(2*n)* limit:1) messages 83 // untill find old message. 84 // if node does not support u/e slices, than full sync performed 85 // if node connection is not in Force mode, do not perform sync if not needed 86 func (n *Node) Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond *sync.Cond) { 87 defer func() { 88 cond.L.Lock() 89 cond.Broadcast() 90 cond.L.Unlock() 91 }() 92 defer wait.Done() 93 if n.IsFeature("u/e") { /* fast path */ 94 if !n.Force { 95 id, err := http_get_id(n.Host + "/u/e/" + Echo + "/-1:1") 96 if !IsMsgId(id) { 97 Info.Printf("%s: no valid MsgId", Echo) 98 return 99 } 100 if err == nil && db.Exists(id) != nil { /* no sync needed */ 101 Info.Printf("%s: no sync needed", Echo) 102 return 103 } 104 } 105 if limit < 0 { 106 limit = -limit 107 try := 0 108 for { // adaptive 109 if try > 16 { /* fallback to old scheme */ 110 limit = 0 111 break 112 } 113 id, err := http_get_id(fmt.Sprintf("%s/u/e/%s/%d:1", 114 n.Host, Echo, -limit)) 115 if err != nil { /* fallback to old scheme */ 116 limit = 0 117 break 118 } 119 if db.Exists(id) != nil { 120 break 121 } 122 try++ 123 limit *= 2 124 } 125 } 126 } else { 127 limit = 0 128 } 129 req := fmt.Sprintf("%s/u/e/%s", n.Host, Echo) 130 if limit > 0 { 131 req = fmt.Sprintf("%s/%d:%d", req, -limit, limit) 132 } 133 Info.Printf("Get %s", req) 134 var res []string 135 if err := http_req_lines(req, func(line string) bool { 136 if strings.Contains(line, ".") { 137 return true 138 } 139 if db.Exists(line) == nil { 140 res = append(res, line) 141 } 142 return true 143 }); err != nil { 144 return 145 } 146 147 // FIXME: handle this error 148 _ = n.Store(db, res) 149 } 150 151 // Do not run more then MaxConnections goroutines in the same time 152 var MaxConnections = 6 153 154 // Send point message to node using GET method of /u/point scheme. 155 // pauth: secret string. msg - raw message in plaintext 156 // returns error 157 func (n *Node) Send(pauth string, msg string) error { 158 msg = base64.URLEncoding.EncodeToString([]byte(msg)) 159 // msg = url.QueryEscape(msg) 160 req := fmt.Sprintf("%s/u/point/%s/%s", n.Host, pauth, msg) 161 resp, err := http.Get(req) 162 Trace.Printf("Get %s", req) 163 if err != nil { 164 return err 165 } 166 buf, err := ioutil.ReadAll(resp.Body) 167 if strings.HasPrefix(string(buf), "msg ok") { 168 Trace.Printf("Server responced msg ok") 169 return nil 170 } else if len(buf) > 0 { 171 err = errors.New(string(buf)) 172 } 173 if err == nil { 174 err = errors.New("Server did not response with ok") 175 } 176 return err 177 } 178 179 // Send point message to node using POST method of /u/point scheme. 180 // pauth: secret string. msg - raw message in plaintext 181 // returns error 182 func (n *Node) Post(pauth string, msg string) error { 183 msg = base64.StdEncoding.EncodeToString([]byte(msg)) 184 // msg = url.QueryEscape(msg) 185 postData := url.Values{ 186 "pauth": {pauth}, 187 "tmsg": {msg}, 188 } 189 resp, err := http.PostForm(n.Host+"/u/point", postData) 190 Trace.Printf("Post %s/u/point", n.Host) 191 if err != nil { 192 return err 193 } 194 buf, err := ioutil.ReadAll(resp.Body) 195 if strings.HasPrefix(string(buf), "msg ok") { 196 Trace.Printf("Server responced msg ok") 197 return nil 198 } else if len(buf) > 0 { 199 err = errors.New(string(buf)) 200 } 201 if err == nil { 202 err = errors.New("Server did not response with ok") 203 } 204 return err 205 } 206 207 // Return list.txt in []string if node supports it. 208 // WARNING: Only echo names are returned! Each string is just echoarea. 209 // Used for fetch all mode. 210 func (n *Node) List() ([]string, error) { 211 var list []string 212 if !n.IsFeature("list.txt") { 213 return list, nil 214 } 215 if err := http_req_lines(n.Host+"/list.txt", func(line string) bool { 216 list = append(list, strings.Split(line, ":")[0]) 217 return true 218 }); err != nil { 219 return list, err 220 } 221 return list, nil 222 } 223 224 // Fetch and write selected messages in db. 225 // ids: selected message ids. 226 // db: Database. 227 // This function make /u/m request, decodes bundles, checks, 228 // and write them to db (line by line). 229 func (n *Node) Store(db *DB, ids []string) error { 230 req := "" 231 var nreq int 232 count := len(ids) 233 Trace.Printf("Get and store messages") 234 for i := 0; i < count; i++ { 235 req = req + "/" + string(ids[i]) 236 nreq++ 237 if nreq < 8 && i < count-1 { 238 continue 239 } 240 if err := http_req_lines(n.Host+"/u/m"+req, func(b string) bool { 241 m, e := DecodeBundle(b) 242 if e != nil { 243 Error.Printf("Can not decode message %s (%s)\n", b, e) 244 return true 245 } 246 if e := db.Store(m); e != nil { 247 Error.Printf("Can not write message %s (%s)\n", m.MsgId, e) 248 } 249 return true 250 }); err != nil { 251 return err 252 } 253 nreq = 0 254 req = "" 255 } 256 return nil 257 } 258 259 // This is Fetcher master function. It makes fetch from node 260 // and run goroutines in parralel mode (one goroutine per echo). 261 // Echolist: list with echoarea names. If list is empty, 262 // function will try to get list via list.txt request. 263 // limit: see Fetcher function. Describe fetching mode/limit. 264 func (n *Node) Fetch(db *DB, Echolist []string, limit int) error { 265 if len(Echolist) == 0 { 266 Echolist, _ = n.List() 267 } 268 if Echolist == nil { 269 return nil 270 } 271 var wait sync.WaitGroup 272 cond := sync.NewCond(&sync.Mutex{}) 273 num := 0 274 Info.Printf("Start fetcher(s) for %s", n.Host) 275 for _, v := range Echolist { 276 if !IsEcho(v) { 277 if strings.Trim(v, " ") != "" { 278 Trace.Printf("Skip echo: %s", v) 279 } 280 continue 281 } 282 wait.Add(1) 283 num += 1 284 if num >= MaxConnections { /* add per one */ 285 cond.L.Lock() 286 Trace.Printf("Start fetcher for: %s", v) 287 go n.Fetcher(db, v, limit, &wait, cond) 288 Trace.Printf("Waiting free thread") 289 cond.Wait() 290 cond.L.Unlock() 291 } else { 292 Trace.Printf("Start fetcher for: %s", v) 293 go n.Fetcher(db, v, limit, &wait, cond) 294 } 295 } 296 Trace.Printf("Waiting thread(s)") 297 wait.Wait() 298 return nil 299 } 300 301 // Check if node has feature? 302 // Features are gets while Connect call. 303 func (n *Node) IsFeature(f string) bool { 304 _, ok := n.Features[f] 305 return ok 306 } 307 308 // Connect to node, get features and returns 309 // pointer to Node object. 310 func Connect(addr string) (*Node, error) { 311 var n Node 312 n.Host = strings.TrimSuffix(addr, "/") 313 n.Features = make(map[string]bool) 314 if err := http_req_lines(n.Host+"/x/features", func(line string) bool { 315 n.Features[line] = true 316 Trace.Printf("%s supports %s", n.Host, line) 317 return true 318 }); err != nil { 319 return nil, err 320 } 321 return &n, nil 322 } 323 324 /* 325 // commented out routine to send e-mails ;) 326 func SendMail(email string, login string, passwd string, server string) error { 327 aserv := strings.Split(server, ":")[0] 328 auth := smtp.PlainAuth("", login, passwd, aserv) 329 msg := "Hello!" 330 msg = "From: noreply@ii-go\n" + 331 "To: " + email + "\n" + 332 "Subject: Hello there\n\n" + 333 msg 334 err := smtp.SendMail(server, auth, "noreply@ii-go",[]string{email}, []byte(msg)) 335 if err != nil { 336 Error.Printf("Can't send message to: %s", email) 337 return err 338 } 339 Info.Printf("Sent message to: %s", email) 340 return nil 341 } 342 */