openidec

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

commit bbc52ee5d3930bd95b83affc2f9ceb28ef644e00
parent 10d006046fef2ec5b1afc0671c91372be2881e6f
Author: Peter Kosyh <p.kosyh@gmail.com>
Date:   Tue,  1 Sep 2020 12:45:26 +0300

first draft

Diffstat:
Aii-tool/main.go | 143+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aii/db.go | 379+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aii/db_test.go | 69+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aii/log.go | 18++++++++++++++++++
Aii/msg.go | 211+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aii/msg_test.go | 65+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aii/net.go | 203+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 1088 insertions(+), 0 deletions(-)

diff --git a/ii-tool/main.go b/ii-tool/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "../ii" + "bufio" + "io/ioutil" + "fmt" + "os" + "io" + "strings" + "flag" +) + +func open_db(path string) *ii.DB { + db := ii.OpenDB(path) + if db == nil { + fmt.Printf("Can no open db: %s\n", path) + os.Exit(1) + } + return db +} + +func main() { + ii.OpenLog(ioutil.Discard, os.Stdout, os.Stderr) + + db_opt := flag.String("db", "./db", "II database path (directory)") + lim_opt := flag.Int("lim", 0, "Fetch last N messages") + verbose_opt := flag.Bool("v", false, "Verbose") + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + fmt.Printf(`Help: %s [options] command [arguments] +Commands: + fetch <url> - fetch + store <bundle|-> - import bundle to database + get <msgid> - show message from database + select <echo> [[start]:lim] - get slice from echo + index - recreate index +Options: + -db=<path> - database path + -lim=<lim> - fetch lim last messages +`, os.Args[0]) + os.Exit(1) + } + switch cmd := args[0]; cmd { + case "fetch": + if len(args) < 2 { + fmt.Printf("No url supplied\n") + os.Exit(1) + } + db := open_db(*db_opt) + n, err := ii.Connect(args[1]) + if err != nil { + fmt.Printf("Can not connect to %s: %s\n", args[1], err) + os.Exit(1) + } + err = n.Fetch(db, nil, *lim_opt) + if err != nil { + fmt.Printf("Can not fetch from %s: %s\n", args[1], err) + os.Exit(1) + } + case "store": + if len(args) < 2 { + fmt.Printf("No bundle file supplied\n") + os.Exit(1) + } + db := open_db(*db_opt) + var f *os.File + var err error + if args[1] == "-" { + f = os.Stdin + } else { + f, err = os.Open(args[1]) + } + if err != nil { + fmt.Printf("Can no open bundle: %s\n", args[1]) + os.Exit(1) + } + defer f.Close() + reader := bufio.NewReader(f) + for { + line, err := reader.ReadString('\n') + if err != nil && err != io.EOF { + fmt.Printf("Can read input (%s)\n", err) + os.Exit(1) + } + line = strings.TrimSuffix(line, "\n") + if err == io.EOF { + break + } + m, err := ii.DecodeBundle(line) + if m == nil { + fmt.Printf("Can not parse message: %s (%s)\n", line, err) + continue + } + if db.Lookup(m.MsgId) == nil { + if err := db.Store(m); err != nil { + fmt.Printf("Can not store message: %s\n", err) + os.Exit(1) + } + } + } + case "get": + if len(args) < 2 { + fmt.Printf("No msgid supplied\n") + os.Exit(1) + } + db := open_db(*db_opt) + m := db.Get(args[1]) + if m != nil { + fmt.Println(m) + } + case "select": + if len(args) < 2 { + fmt.Printf("No echo supplied\n") + os.Exit(1) + } + db := open_db(*db_opt) + req := ii.Query { Echo: args[1] } + if len(args) > 2 { + fmt.Sscanf(args[2], "%d:%d", &req.Start, &req.Lim) + } + resp := db.SelectIDS(req) + for _, v := range(resp) { + if *verbose_opt { + fmt.Println(db.Get(v)) + } else { + fmt.Println(v) + } + } + case "index": + db := open_db(*db_opt) + if err:= db.CreateIndex(); err != nil { + fmt.Printf("Can not rebuild index: %s\n", err) + os.Exit(1) + } + default: + fmt.Printf("Wrong cmd: %s\n", cmd) + os.Exit(1) + } + os.Exit(0) +} diff --git a/ii/db.go b/ii/db.go @@ -0,0 +1,379 @@ +package ii + +import ( + "bufio" + "path/filepath" + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + "time" +) + +type MsgInfo struct { + Id string + Echo string + Off int64 + Repto string +} + +type Index struct { + Hash map[string]MsgInfo + List []string +} + +type DB struct { + Path string + Idx Index + Sync sync.RWMutex +} + +func mkdir(path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + } +} + +func append_file(fn string, text string) error { + f, err := os.OpenFile(fn, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + if _, err := f.WriteString(text + "\n"); err != nil { + return err + } + return nil +} + +func (db *DB) Lock() bool { + try := 16 + for try > 0 { + if err := os.Mkdir(db.LockPath(), 0777); err == nil || os.IsExist(err) { + return true + } + time.Sleep(time.Second) + try -= 1 + } + return false +} + +func (db *DB) Unlock() { + os.Remove(db.LockPath()) +} + +func (db *DB) IndexPath() string { + return fmt.Sprintf("%s.idx", db.Path) +} + +func (db *DB) BundlePath() string { + return fmt.Sprintf("%s", db.Path) +} + +func (db *DB) LockPath() string { + pat := strings.Replace(db.Path, "/", "_", -1) + return fmt.Sprintf("%s/%s-bundle.lock", os.TempDir(), pat) +} + +// var MaxMsgLen int = 128 * 1024 * 1024 + +func (db *DB) CreateIndex() error { + db.Sync.Lock() + defer db.Sync.Unlock() + db.Lock() + defer db.Unlock() + + return db._CreateIndex() +} +func file_lines(path string, fn func(string) bool) error { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + return f_lines(f, fn) +} + +func f_lines(f *os.File, fn func(string) bool) error { + reader := bufio.NewReader(f) + for { + line, err := reader.ReadString('\n') + if err != nil && err != io.EOF { + return err + } + line = strings.TrimSuffix(line, "\n") + if err == io.EOF { + break + } + if !fn(line) { + break + } + } + // scanner := bufio.NewScanner(f) + // scanner.Buffer(make([]byte, MaxMsgLen), MaxMsgLen) + + // for scanner.Scan() { + // line := scanner.Text() + // if !fn(line) { + // break + // } + // } + return nil +} + +func (db *DB) _CreateIndex() error { + fidx, err := os.OpenFile(db.IndexPath(), os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer fidx.Close() + var off int64 + return file_lines(db.BundlePath(), func (line string) bool { + msg, _ := DecodeBundle(line) + if msg == nil { + off += int64(len(line) + 1) + return true + } + repto, _ := msg.Tag("repto") + if repto != "" { + repto = ":" + repto + } + fidx.WriteString(fmt.Sprintf("%s:%s:%d%s\n", msg.MsgId, msg.Echo, off, repto)) + off += int64(len(line) + 1) + return true + }) +} + +func (db *DB) LoadIndex() error { + if db.Idx.Hash != nil { // already loaded + return nil + } + + file, err := os.Open(db.IndexPath()) + if err != nil { + if os.IsNotExist(err) { + err = db._CreateIndex() + if err != nil { + return err + } + file, err = os.Open(db.IndexPath()) + if err != nil { + return err + } + } else { + return err + } + } + defer file.Close() + + var Idx Index + Idx.Hash = make(map[string]MsgInfo) + // Idx.List = make([]string) + var err2 error + err = f_lines(file, func (line string) bool { + info := strings.Split(line, ":") + if len(info) < 3 { + err2 = errors.New("Wrong format") + return false + } + mi := MsgInfo{Id: info[0], Echo: info[1]} + if _, err := fmt.Sscanf(info[2], "%d", &mi.Off); err != nil { + err2 = errors.New("Wrong offset") + return false + } + if len(info) > 3 { + mi.Repto = info[3] + } + if _, ok := Idx.Hash[mi.Id]; !ok { // new msg + Idx.List = append(Idx.List, mi.Id) + } + Idx.Hash[mi.Id] = mi + return true + }) + if err != nil { + return err + } + if err2 != nil { + return err2 + } + db.Idx = Idx + return nil +} + +func (db *DB) _Lookup(Id string) *MsgInfo { + if err := db.LoadIndex(); err != nil { + return nil + } + info, ok := db.Idx.Hash[Id] + if !ok { + return nil + } + return &info +} + +func (db *DB) Lookup(Id string) *MsgInfo { + db.Sync.RLock() + defer db.Sync.RUnlock() + db.Lock() + defer db.Unlock() + + return db._Lookup(Id) +} + +func (db *DB) Get(Id string) *Msg { + db.Sync.RLock() + defer db.Sync.RUnlock() + db.Lock() + defer db.Unlock() + + info := db._Lookup(Id) + if info == nil { + return nil + } + f, err := os.Open(db.BundlePath()) + if err != nil { + return nil + } + _, err = f.Seek(info.Off, 0) + if err != nil { + return nil + } + var m *Msg; + err = f_lines(f, func (line string)bool { + m, _ = DecodeBundle(line) + return false + }) + if err != nil { + return nil + } + return m +} + +type Query struct { + Echo string + Repto string + Start int + Lim int +} + +func prependStr(x []string, y string) []string { + x = append(x, "") + copy(x[1:], x) + x[0] = y + return x +} + +func (db *DB)Match(info MsgInfo, r Query) bool { + if r.Echo != "" && r.Echo != info.Echo { + return false + } + if r.Repto != "" && r.Repto != info.Repto { + return false + } + return true +} + +func (db *DB)SelectIDS(r Query) []string { + var Resp []string; + db.Sync.Lock() + defer db.Sync.Unlock() + db.Lock() + defer db.Unlock() + + if err := db.LoadIndex(); err != nil { + return Resp + } + size := len(db.Idx.List) + if size == 0 { + return Resp + } + if r.Start < 0 { + start := 0 + for i := size - 1; i >= 0; i-- { + id := db.Idx.List[i] + if db.Match(db.Idx.Hash[id], r) { + Resp = prependStr(Resp, id) + start -= 1 + if start == r.Start { + break + } + } + } + if r.Lim > 0 && len(Resp) > r.Lim { + Resp = Resp[0:r.Lim] + } + return Resp + } + found := 0 + for i := r.Start; i < size; i++ { + id := db.Idx.List[i] + if db.Match(db.Idx.Hash[id], r) { + Resp = append(Resp, id) + found += 1 + if r.Lim > 0 && found == r.Lim { + break + } + } + } + return Resp +} + +func (db *DB)Store(m *Msg) error { + return db._Store(m, false) +} + +func (db *DB)Edit(m *Msg) error { + return db._Store(m, true) +} + +func (db *DB) _Store(m *Msg, edit bool) error { + db.Sync.Lock() + defer db.Sync.Unlock() + db.Lock() + defer db.Unlock() + if _, ok := db.Idx.Hash[m.MsgId]; ok && !edit { // exist and not edit + return errors.New("Already exists") + } + repto, _ := m.Tag("repto") + if err := db.LoadIndex(); err != nil { + return err + } + fi, err := os.Stat(db.BundlePath()) + var off int64 + if err == nil { + off = fi.Size() + } + if err := append_file(db.BundlePath(), m.Encode()); err != nil { + return err + } + + mi := MsgInfo{Id: m.MsgId, Echo: m.Echo, Off: off, Repto: repto} + + if repto != "" { + repto = ":" + repto + } + if err := append_file(db.IndexPath(), + fmt.Sprintf("%s:%s:%d%s", m.MsgId, m.Echo, off, repto)); err != nil { + return err + } + if _, ok := db.Idx.Hash[m.MsgId]; !ok { // new msg + db.Idx.List = append(db.Idx.List, m.MsgId) + } + db.Idx.Hash[m.MsgId] = mi + return nil +} + +func OpenDB(path string) *DB { + var db DB + db.Path = path + info, err := os.Stat(filepath.Dir(path)) + if err != nil || !info.IsDir() { + return nil + } + // db.Idx = make(map[string]Index) + return &db +} diff --git a/ii/db_test.go b/ii/db_test.go @@ -0,0 +1,69 @@ +package ii + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestOpenDB(t *testing.T) { + var db *DB + dir, err := ioutil.TempDir(os.TempDir(), "ii.test.*") + if err != nil { + t.Error("Can not create temp dir") + return + } + defer os.RemoveAll(dir) + path := dir + "/db" + db = OpenDB(path) + if db == nil { + t.Error("Can not open db") + return + } + var m *Msg + if m, err = DecodeBundle(Test_msg); m == nil { + t.Error("Can not decode msg", err) + return + } + if err := db.Store(m); err != nil { + t.Error("Can not save msg", err) + return + } + m2 := db.Get(m.MsgId) + if m2 == nil || m2.Text != m.Text { + t.Error("Can not lookup msg") + return + } + + os.Remove(db.IndexPath()) + + db = OpenDB(path) // reopen + m2 = db.Get(m.MsgId) + if m2 == nil || m2.Text != m.Text { + t.Error("Can not lookup msg (create new index)") + return + } + m2.Text = "Edited" + if err := db.Edit(m2); err != nil { + t.Error("Can not save duplicate msg", err) + return + } + m3 := db.Get(m2.MsgId) + if m3 == nil || m3.Text != m2.Text { + t.Error("Can not lookup msg (edited)") + return + } + db = OpenDB(path) // reopen + m3 = db.Get(m.MsgId) + if m3 == nil || m3.Text != m2.Text { + t.Error("Can not lookup msg (reopen, edited msg)", m3.Text) + return + } + os.Remove(db.IndexPath()) + db = OpenDB(path) // reopen + m3 = db.Get(m.MsgId) + if m3 == nil || m3.Text != m2.Text { + t.Error("Can not lookup msg (create new index, edited msg)", m3.Text) + return + } +} diff --git a/ii/log.go b/ii/log.go @@ -0,0 +1,18 @@ +package ii + +import ( + "log" + "io" +) + +var ( + Trace *log.Logger + Info *log.Logger + Error *log.Logger +) + +func OpenLog(trace io.Writer, info io.Writer, error io.Writer) { + Trace = log.New(trace, "=== ", log.Ldate|log.Ltime) + Info = log.New(info, "INFO: ", log.Ldate|log.Ltime) + Error = log.New(error, "ERR: ", log.Ldate|log.Ltime) +} diff --git a/ii/msg.go b/ii/msg.go @@ -0,0 +1,211 @@ +package ii + +import ( + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "strings" + "time" +) + +type Tags struct { + Hash map[string]string + List []string +} + +type Msg struct { + MsgId string + Tags Tags + Echo string + Date int64 + From string + Addr string + To string + Subj string + Text string +} + +func MsgId(msg string) string { + h := sha256.Sum256([]byte(msg)) + id := base64.StdEncoding.EncodeToString(h[:]) + id = strings.Replace(id, "+", "A", -1) + id = strings.Replace(id, "/", "Z", -1) + return id[0:20] +} + +func DecodeMsgline(msg string, enc bool) *Msg { + var m Msg + var data []byte + var err error + if enc { + data, err = base64.StdEncoding.DecodeString(msg) + if err != nil { + return nil + } + } else { + data = []byte(msg) + } + text := strings.Split(string(data), "\n") + if len(text) <= 6 { + return nil + } + if text[3] != "" { + return nil + } + m.Echo = text[0] + m.To = text[1] + m.Subj = text[2] + m.Date = time.Now().Unix() + start := 4 + repto := text[4] + m.Tags, _ = MakeTags("ii/ok") + + if strings.HasPrefix(repto, "@Repto:") || strings.HasPrefix(repto, "@repto:") { + start += 1 + m.Tags.Add("repto/" + strings.Trim(strings.Split(repto, ":")[1], " ")) + } + for i := start; i < len(text); i++ { + m.Text += text[i] + "\n" + } + m.Text = strings.TrimSuffix(m.Text, "\n") + return &m +} + +func DecodeBundle(msg string) (*Msg, error) { + var m Msg + if strings.Contains(msg, ":") { + spl := strings.Split(msg, ":") + if len(spl) != 2 { + return nil, errors.New("Wrong bundle format") + } + msg = spl[1] + m.MsgId = spl[0] + } + data, err := base64.StdEncoding.DecodeString(msg) + if err != nil { + return nil, err + } + if m.MsgId == "" { + m.MsgId = MsgId(string(data)) + } + text := strings.Split(string(data), "\n") + if len(text) <= 8 { + return nil, errors.New("Wrong message format") + } + m.Tags, err = MakeTags(text[0]) + if err != nil { + return nil, err + } + m.Echo = text[1] + _, err = fmt.Sscanf(text[2], "%d", &m.Date) + if err != nil { + return nil, err + } + m.From = text[3] + m.Addr = text[4] + m.To = text[5] + m.Subj = text[6] + for i := 8; i < len(text); i++ { + m.Text += text[i] + "\n" + } + m.Text = strings.TrimSuffix(m.Text, "\n") + return &m, nil +} + +func MakeTags(str string) (Tags, error) { + var t Tags + str = strings.Trim(str, " ") + if str == "" { // empty + return t, nil + } + tags := strings.Split(str, "/") + if len(tags)%2 != 0 { + return t, errors.New("Wrong tags: " + str) + } + t.Hash = make(map[string]string) + for i := 0; i < len(tags); i += 2 { + t.Hash[tags[i]] = tags[i+1] + t.List = append(t.List, tags[i]) + } + return t, nil +} + +func NewTags(str string) Tags { + t, _ := MakeTags(str) + return t +} + +func (t Tags) Add(str string) error { + tags := strings.Split(str, "/") + if len(tags)%2 != 0 { + return errors.New("Wrong tags") + } + for i := 0; i < len(tags); i += 2 { + t.Hash[tags[i]] = tags[i+1] + t.List = append(t.List, tags[i]) + } + return nil +} + +func (t Tags) String() string { + var text string + if t.Hash == nil { + return "" + } + for _, n := range t.List { + if val, ok := t.Hash[n]; ok { + text += fmt.Sprintf("%s/%s/", n, val) + } + } + text = strings.TrimSuffix(text, "/") + return text +} + +func (m *Msg) Dump() string { + if m == nil { + return "" + } + return fmt.Sprintf("id: %s\ntags: %s\nechoarea: %s\ndate: %s\nmsgfrom: %s\naddr: %s\nmsgto: %s\nsubj: %s\n\n%s", + m.MsgId, m.Tags.String(), m.Echo, time.Unix(m.Date, 0), m.From, m.Addr, m.To, m.Subj, m.Text) +} + +func (m *Msg) Tag(n string) (string, bool) { + if m == nil || m.Tags.Hash == nil { + return "", false + } + v, ok := m.Tags.Hash[n] + if ok { + return v, true + } + return "", false +} + +func (m *Msg) String() string { + tags := m.Tags.String() + text := strings.Join([]string{tags, m.Echo, + fmt.Sprint(m.Date), + m.From, + m.Addr, + m.To, + m.Subj, + "", + m.Text}, "\n") + return text +} + +func (m *Msg) Encode() string { + var text string + if m == nil || m.Echo == "" { + return "" + } + if m.Date == 0 { + now := time.Now() + m.Date = now.Unix() + } + text = m.String() + if m.MsgId == "" { + m.MsgId = MsgId(text) + } + return m.MsgId + ":" + base64.StdEncoding.EncodeToString([]byte(text)) +} diff --git a/ii/msg_test.go b/ii/msg_test.go @@ -0,0 +1,65 @@ +package ii + +import ( + "encoding/base64" + "fmt" + "testing" +) + +var Test_msg string = "a5OX4lC8uB8OIzzzGQ5B:" + + "aWkvb2svcmVwdG8va2N3UlBEQWNuNkxsQlVRWVhMY0sKc3RkLmdhbWUKMTU5ODE5NjE1MQ" + + "pQZXRlcgpzeXNjYWxsLDEKdzIwMTQwMwpSZTog0JvQuNC00LjRjyDigJQg0L3QtSDQvNC+0LPRgyDQv9GA0L7QudGC0Lg" + + "g0LTQsNC70YzRiNC1LiDQntGI0LjQsdC60LA/Cgo+INCT0LTQtSDQvNGLINGB0LXQudGH0LDRgSDQvtCx0YHRg9C20LTQ" + + "sNC10Lwg0L7RiNC40LHQutC4INCyINC40LPRgNCw0YU/DQrQnNC+0LbQvdC+INC90LAg0YTQvtGA0YPQvNC1IGh0dHA6L" + + "y9pbnN0ZWFkLWdhbWVzLnJ1INC40LvQuCDQsiDQutCw0YDRgtC+0YfQutC1INC40LPRgNGLLCDQuNC70Lgg0LfQtNC10Y" + + "HRjC4uLiDQkiDQu9GO0LHQvtC8INGB0LvRg9GH0LDQtSwg0L3Rg9C20LXQvSBzYXZlINC4INC+0L/QuNGB0LDQvdC40LU" + + "g0YHQuNGC0YPQsNGG0LjQuC4NCg0KUC5TPiDQmNCz0YDQsCDRgtC+0YfQvdC+INC/0YDQvtGF0L7QtNC40LzQsCwg0L3Q" + + "tSDRgtCw0Log0LTQsNCy0L3QviDQtdGRINC/0YDQvtGI0LvQviDQvdC10YHQutC+0LvRjNC60L4g0YfQtdC70L7QstC10" + + "LouINCd0L4sINC60L7QvdC10YfQvdC+LCDQsdCw0LPQuCDQvNC+0LPRg9GCINCx0YvRgtGMLg==" + +func TestParse(t *testing.T) { + var m *Msg + m, _ = DecodeBundle(Test_msg) + if m == nil { + t.Error("Can not decode msg") + } + text := m.MsgId + ":" + m.Encode() + decoded, _ := base64.StdEncoding.DecodeString(text) + decoded2, _ := base64.StdEncoding.DecodeString(Test_msg) + if string(decoded) != string(decoded2) { + t.Error("Encoded not as etalon") + } + fmt.Println(m.String()) +} + +func TestMsgline(t *testing.T) { + var m *Msg + m = DecodeMsgline(`test.area +All +hello world! + +@repto: 12345678901234567890 +This is my +message! + +wbr, Anonymous! +`, false) + if m == nil { + t.Error("Can not decode msg") + } +} +func TestMake(t *testing.T) { + m := Msg{ + Tags: NewTags("ii/ok/repto/aaaaa"), + Echo: "test.echo", + Text: "Hello world!", + } + msg := m.Encode() + if msg == "" { + t.Error("Can not encode msg") + } + m2, _ := DecodeBundle(msg) + if m2 == nil { + t.Error("Can not decode encoded msg") + } +} diff --git a/ii/net.go b/ii/net.go @@ -0,0 +1,203 @@ +package ii + +import ( + "bufio" + "fmt" + "sync" + "io" + "net/http" + "strings" +) + +type Node struct { + Host string + Features map[string]bool +} + +func http_req_lines(url string, fn func(string) bool) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadString('\n') + if err != nil && err != io.EOF { + return err + } + line = strings.TrimSuffix(line, "\n") + if err == io.EOF { + break + } + if !fn(line) { + break + } + } + return nil +} +func http_get_id(url string) (string, error) { + res := "" + if err := http_req_lines(url, func(line string) bool { + if strings.Contains(line, ".") { + return true + } + res += line + return true + }); err != nil { + return "", err + } + return res, nil +} + +func (n *Node)Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond *sync.Cond) { + defer func () { + cond.L.Lock() + cond.Broadcast() + cond.L.Unlock() + }() + defer wait.Done() + if n.IsFeature("u/e") { /* fast path */ + id, err := http_get_id(n.Host + "/u/e/" + Echo +"/-1:1") + if err == nil && db.Lookup(id) != nil { /* no sync needed */ + Info.Printf("%s: no sync needed", Echo) + return + } + if limit < 0 { + limit = -limit + try := 0 + for { // adaptive + if try > 16 { /* fallback to old scheme */ + limit = 0 + break + } + id, err := http_get_id(fmt.Sprintf("%s/u/e/%s/%d:1", + n.Host, Echo, -limit)) + if err != nil { /* fallback to old scheme */ + limit = 0 + break + } + if db.Lookup(id) != nil { + break + } + try ++ + limit *= 2 + } + } + } else { + limit = 0 + } + req := fmt.Sprintf("%s/u/e/%s", n.Host, Echo) + if limit > 0 { + req = fmt.Sprintf("%s/%d:%d", req, -limit, limit) + } + Info.Printf("Get %s", req) + var res []string + if err := http_req_lines(req, func(line string) bool { + if strings.Contains(line, ".") { + return true + } + if db.Lookup(line) == nil { + res = append(res, line) + } + return true + }); err != nil { + return + } + n.Store(db, res) +} + +var MaxConnections = 6 + +func (n *Node)List() ([]string, error) { + var list []string + if !n.IsFeature("list.txt") { + return list, nil + } + if err := http_req_lines(n.Host + "/list.txt", func(line string) bool { + list = append(list, strings.Split(line, ":")[0]) + return true + }); err != nil { + return list, err + } + return list, nil +} + +func (n *Node)Store(db *DB, ids []string) error { + req := "" + var nreq int + count := len(ids) + for i := 0; i < count; i++ { + req = req + "/" + string(ids[i]) + nreq ++ + if nreq < 8 && i < count - 1 { + continue + } + if err := http_req_lines(n.Host + "/u/m" + req, func(b string) bool { + m, e := DecodeBundle(b) + if e != nil { + Error.Printf("Can not decode message %s (%s)\n", b, e) + return true + } + if e := db.Store(m); e != nil { + Error.Printf("Can not write message %s (%s)\n", m.MsgId, e) + } + return true + }); err != nil { + return err + } + nreq = 0 + req = "" + } + return nil +} + +func (n *Node)Fetch(db *DB, Echolist []string, limit int) error { + if len(Echolist) == 0 { + Echolist, _ = n.List() + } + if Echolist == nil { + return nil + } + var wait sync.WaitGroup + cond := sync.NewCond(&sync.Mutex{}) + num := 0 + Info.Printf("Start fetcher(s) for %s", n.Host) + for _, v := range(Echolist) { + wait.Add(1) + num += 1 + if num >= MaxConnections { /* add per one */ + cond.L.Lock() + Trace.Printf("Start fetcher for: %s", v) + go n.Fetcher(db, v, limit, &wait, cond) + Trace.Printf("Waiting free thread") + cond.Wait() + cond.L.Unlock() + } else { + Trace.Printf("Start fetcher for: %s", v) + go n.Fetcher(db, v, limit, &wait, cond) + } + } + Trace.Printf("Waiting thread(s)") + wait.Wait() + return nil +} + +func (n *Node)IsFeature(f string) bool { + _, ok := n.Features[f] + return ok +} + +func Connect(addr string) (*Node, error) { + var n Node + n.Host = strings.TrimSuffix(addr, "/") + n.Features = make(map[string]bool) + if err := http_req_lines(n.Host + "/x/features", func(line string) bool { + n.Features[line] = true + Trace.Printf("%s supports %s", n.Host, line) + return true + }); err != nil { + return nil, err + } + return &n, nil +}