commit 642d9e76edcb7b239bf37ab78692e6f34e4df8c4
parent bbc52ee5d3930bd95b83affc2f9ceb28ef644e00
Author: Peter Kosyh <p.kosyh@gmail.com>
Date: Tue, 1 Sep 2020 16:45:29 +0300
node prototype
Diffstat:
| A | ii-node/main.go | | | 87 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| M | ii-tool/main.go | | | 15 | +++++++++------ |
| M | ii/db.go | | | 162 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------- |
| M | ii/log.go | | | 8 | ++++---- |
| M | ii/msg.go | | | 9 | +++++++++ |
| M | ii/net.go | | | 33 | +++++++++++++++++---------------- |
6 files changed, 249 insertions(+), 65 deletions(-)
diff --git a/ii-node/main.go b/ii-node/main.go
@@ -0,0 +1,87 @@
+package main
+
+import (
+ "../ii"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strings"
+)
+
+func open_db(path string) *ii.DB {
+ db := ii.OpenDB(path)
+ if db == nil {
+ ii.Error.Printf("Can no open db: %s\n", path)
+ os.Exit(1)
+ }
+ return db
+}
+
+func main() {
+ ii.OpenLog(ioutil.Discard, os.Stdout, os.Stderr)
+
+ db_opt := flag.String("db", "./db", "II database path (directory)")
+ db := open_db(*db_opt)
+ flag.Parse()
+ http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "%s\n", r.URL.Path)
+ })
+ http.HandleFunc("/list.txt", func(w http.ResponseWriter, r *http.Request) {
+ echoes := db.Echoes()
+ for _, v := range echoes {
+ fmt.Fprintf(w, "%s:%d:\n", v.Name, v.Count)
+ }
+ })
+ http.HandleFunc("/u/m/", func(w http.ResponseWriter, r *http.Request) {
+ ids := strings.Split(r.URL.Path[5:], "/")
+ for _, i := range ids {
+ m := db.GetBundle(i)
+ if m != "" {
+ fmt.Fprintf(w, "%s\n", m)
+ }
+ }
+ })
+ http.HandleFunc("/u/e/", func(w http.ResponseWriter, r *http.Request) {
+ echoes := strings.Split(r.URL.Path[5:], "/")
+ if len(echoes) == 0 {
+ return
+ }
+ slice := echoes[len(echoes)-1:][0]
+ var idx, lim int
+ if _, err := fmt.Sscanf(slice, "%d:%d", &idx, &lim); err == nil {
+ echoes = echoes[:len(echoes)-1]
+ } else {
+ idx, lim = 0, 0
+ }
+
+ for _, e := range echoes {
+ if !ii.IsEcho(e) {
+ continue
+ }
+ fmt.Fprintf(w, "%s\n", e)
+ ids := db.SelectIDS(ii.Query{Echo: e, Start: idx, Lim: lim})
+ for _, id := range ids {
+ fmt.Fprintf(w, "%s\n", id)
+ }
+ }
+ })
+ http.HandleFunc("/m/", func(w http.ResponseWriter, r *http.Request) {
+ id := r.URL.Path[3:]
+ if !ii.IsMsgId(id) {
+ return
+ }
+ m := db.Get(id)
+ ii.Info.Printf("/m/%s %s", id, m)
+ if m != nil {
+ fmt.Fprintf(w, m.String())
+ }
+ })
+ http.HandleFunc("/x/features", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "list.txt\nu/e\n")
+ })
+ if err := http.ListenAndServe(":8080", nil); err != nil {
+ ii.Error.Printf("Error running web server: %s", err)
+ }
+}
diff --git a/ii-tool/main.go b/ii-tool/main.go
@@ -3,12 +3,12 @@ package main
import (
"../ii"
"bufio"
- "io/ioutil"
+ "flag"
"fmt"
- "os"
"io"
+ "io/ioutil"
+ "os"
"strings"
- "flag"
)
func open_db(path string) *ii.DB {
@@ -27,6 +27,9 @@ func main() {
lim_opt := flag.Int("lim", 0, "Fetch last N messages")
verbose_opt := flag.Bool("v", false, "Verbose")
flag.Parse()
+ if *verbose_opt {
+ ii.OpenLog(os.Stdout, os.Stdout, os.Stderr)
+ }
args := flag.Args()
if len(args) < 1 {
@@ -117,12 +120,12 @@ Options:
os.Exit(1)
}
db := open_db(*db_opt)
- req := ii.Query { Echo: args[1] }
+ req := ii.Query{Echo: args[1]}
if len(args) > 2 {
fmt.Sscanf(args[2], "%d:%d", &req.Start, &req.Lim)
}
resp := db.SelectIDS(req)
- for _, v := range(resp) {
+ for _, v := range resp {
if *verbose_opt {
fmt.Println(db.Get(v))
} else {
@@ -131,7 +134,7 @@ Options:
}
case "index":
db := open_db(*db_opt)
- if err:= db.CreateIndex(); err != nil {
+ if err := db.CreateIndex(); err != nil {
fmt.Printf("Can not rebuild index: %s\n", err)
os.Exit(1)
}
diff --git a/ii/db.go b/ii/db.go
@@ -2,11 +2,12 @@ package ii
import (
"bufio"
- "path/filepath"
"errors"
"fmt"
"io"
"os"
+ "path/filepath"
+ "sort"
"strings"
"sync"
"time"
@@ -20,8 +21,9 @@ type MsgInfo struct {
}
type Index struct {
- Hash map[string]MsgInfo
- List []string
+ Hash map[string]MsgInfo
+ List []string
+ FileSize int64
}
type DB struct {
@@ -132,7 +134,7 @@ func (db *DB) _CreateIndex() error {
}
defer fidx.Close()
var off int64
- return file_lines(db.BundlePath(), func (line string) bool {
+ return file_lines(db.BundlePath(), func(line string) bool {
msg, _ := DecodeBundle(line)
if msg == nil {
off += int64(len(line) + 1)
@@ -147,20 +149,24 @@ func (db *DB) _CreateIndex() error {
return true
})
}
-
-func (db *DB) LoadIndex() error {
- if db.Idx.Hash != nil { // already loaded
- return nil
+func (db *DB) _ReopenIndex() (*os.File, error) {
+ err := db._CreateIndex()
+ if err != nil {
+ return nil, err
}
-
file, err := os.Open(db.IndexPath())
if err != nil {
+ return nil, err
+ }
+ return file, nil
+}
+func (db *DB) LoadIndex() error {
+ var Idx Index
+ file, err := os.Open(db.IndexPath())
+ if err != nil {
+ db.Idx = Idx
if os.IsNotExist(err) {
- err = db._CreateIndex()
- if err != nil {
- return err
- }
- file, err = os.Open(db.IndexPath())
+ file, err = db._ReopenIndex()
if err != nil {
return err
}
@@ -170,11 +176,33 @@ func (db *DB) LoadIndex() error {
}
defer file.Close()
- var Idx Index
- Idx.Hash = make(map[string]MsgInfo)
- // Idx.List = make([]string)
+ info, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ fsize := info.Size()
+
+ if db.Idx.Hash != nil { // already loaded
+ if fsize > db.Idx.FileSize {
+ Trace.Printf("Refreshing index file...")
+ if _, err := file.Seek(0, 2); err != nil {
+ return err
+ }
+ Idx = db.Idx
+ } else if info.Size() < db.Idx.FileSize {
+ Info.Printf("Index file truncated, rebuild inndex...")
+ file, err = db._ReopenIndex()
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+ }
+ return nil
+ } else {
+ Idx.Hash = make(map[string]MsgInfo)
+ }
var err2 error
- err = f_lines(file, func (line string) bool {
+ err = f_lines(file, func(line string) bool {
info := strings.Split(line, ":")
if len(info) < 3 {
err2 = errors.New("Wrong format")
@@ -200,6 +228,7 @@ func (db *DB) LoadIndex() error {
if err2 != nil {
return err2
}
+ Idx.FileSize = fsize
db.Idx = Idx
return nil
}
@@ -224,7 +253,7 @@ func (db *DB) Lookup(Id string) *MsgInfo {
return db._Lookup(Id)
}
-func (db *DB) Get(Id string) *Msg {
+func (db *DB) GetBundle(Id string) string {
db.Sync.RLock()
defer db.Sync.RUnlock()
db.Lock()
@@ -232,29 +261,46 @@ func (db *DB) Get(Id string) *Msg {
info := db._Lookup(Id)
if info == nil {
- return nil
+ Info.Printf("Can not find bundle: %s\n", Id)
+ return ""
}
f, err := os.Open(db.BundlePath())
if err != nil {
- return nil
+ Error.Printf("Can not open DB: %s\n", err)
+ return ""
}
+ defer f.Close()
_, err = f.Seek(info.Off, 0)
if err != nil {
- return nil
+ Error.Printf("Can not seek DB: %s\n", err)
+ return ""
}
- var m *Msg;
- err = f_lines(f, func (line string)bool {
- m, _ = DecodeBundle(line)
+ var bundle string
+ err = f_lines(f, func(line string) bool {
+ bundle = line
return false
})
if err != nil {
+ Error.Printf("Can not get %s from DB: %s\n", Id, err)
+ return ""
+ }
+ return bundle
+}
+
+func (db *DB) Get(Id string) *Msg {
+ bundle := db.GetBundle(Id)
+ if bundle == "" {
return nil
}
+ m, err := DecodeBundle(bundle)
+ if err != nil {
+ Error.Printf("Can not decode bundle on get: %s\n", Id)
+ }
return m
}
type Query struct {
- Echo string
+ Echo string
Repto string
Start int
Lim int
@@ -267,7 +313,7 @@ func prependStr(x []string, y string) []string {
return x
}
-func (db *DB)Match(info MsgInfo, r Query) bool {
+func (db *DB) Match(info MsgInfo, r Query) bool {
if r.Echo != "" && r.Echo != info.Echo {
return false
}
@@ -277,8 +323,46 @@ func (db *DB)Match(info MsgInfo, r Query) bool {
return true
}
-func (db *DB)SelectIDS(r Query) []string {
- var Resp []string;
+type Echo struct {
+ Name string
+ Count int
+}
+
+func (db *DB) Echoes() []Echo {
+ db.Sync.Lock()
+ defer db.Sync.Unlock()
+ db.Lock()
+ defer db.Unlock()
+ var list []Echo
+
+ if err := db.LoadIndex(); err != nil {
+ return list
+ }
+
+ hash := make(map[string]Echo)
+ size := len(db.Idx.List)
+ for i := 0; i < size; i++ {
+ id := db.Idx.List[i]
+ info := db.Idx.Hash[id]
+ e := info.Echo
+ if v, ok := hash[e]; ok {
+ v.Count++
+ hash[e] = v
+ } else {
+ hash[e] = Echo{Name: e, Count: 1}
+ }
+ }
+ for _, v := range hash {
+ list = append(list, v)
+ }
+ sort.Slice(list, func(i, j int) bool {
+ return list[i].Name < list[j].Name
+ })
+ return list
+}
+
+func (db *DB) SelectIDS(r Query) []string {
+ var Resp []string
db.Sync.Lock()
defer db.Sync.Unlock()
db.Lock()
@@ -322,11 +406,11 @@ func (db *DB)SelectIDS(r Query) []string {
return Resp
}
-func (db *DB)Store(m *Msg) error {
+func (db *DB) Store(m *Msg) error {
return db._Store(m, false)
}
-func (db *DB)Edit(m *Msg) error {
+func (db *DB) Edit(m *Msg) error {
return db._Store(m, true)
}
@@ -351,19 +435,19 @@ func (db *DB) _Store(m *Msg, edit bool) error {
return err
}
- mi := MsgInfo{Id: m.MsgId, Echo: m.Echo, Off: off, Repto: repto}
-
if repto != "" {
repto = ":" + repto
}
- if err := append_file(db.IndexPath(),
- fmt.Sprintf("%s:%s:%d%s", m.MsgId, m.Echo, off, repto)); err != nil {
+ rec := fmt.Sprintf("%s:%s:%d%s", m.MsgId, m.Echo, off, repto)
+ if err := append_file(db.IndexPath(), rec); err != nil {
return err
}
- if _, ok := db.Idx.Hash[m.MsgId]; !ok { // new msg
- db.Idx.List = append(db.Idx.List, m.MsgId)
- }
- db.Idx.Hash[m.MsgId] = mi
+ // if _, ok := db.Idx.Hash[m.MsgId]; !ok { // new msg
+ // db.Idx.List = append(db.Idx.List, m.MsgId)
+ // }
+ // mi := MsgInfo{Id: m.MsgId, Echo: m.Echo, Off: off, Repto: repto}
+ // db.Idx.Hash[m.MsgId] = mi
+ // db.Idx.FileSize += (int64(len(rec) + 1))
return nil
}
diff --git a/ii/log.go b/ii/log.go
@@ -1,14 +1,14 @@
package ii
import (
- "log"
"io"
+ "log"
)
var (
- Trace *log.Logger
- Info *log.Logger
- Error *log.Logger
+ Trace *log.Logger
+ Info *log.Logger
+ Error *log.Logger
)
func OpenLog(trace io.Writer, info io.Writer, error io.Writer) {
diff --git a/ii/msg.go b/ii/msg.go
@@ -34,6 +34,15 @@ func MsgId(msg string) string {
return id[0:20]
}
+func IsMsgId(id string) bool {
+ return len(id) == 20 && !strings.Contains(id, ".")
+}
+
+func IsEcho(e string) bool {
+ l := len(e)
+ return l >= 3 && l <= 120 && strings.Contains(e, ".")
+}
+
func DecodeMsgline(msg string, enc bool) *Msg {
var m Msg
var data []byte
diff --git a/ii/net.go b/ii/net.go
@@ -3,14 +3,14 @@ package ii
import (
"bufio"
"fmt"
- "sync"
"io"
"net/http"
"strings"
+ "sync"
)
type Node struct {
- Host string
+ Host string
Features map[string]bool
}
@@ -50,15 +50,15 @@ func http_get_id(url string) (string, error) {
return res, nil
}
-func (n *Node)Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond *sync.Cond) {
- defer func () {
+func (n *Node) Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond *sync.Cond) {
+ defer func() {
cond.L.Lock()
cond.Broadcast()
cond.L.Unlock()
}()
defer wait.Done()
if n.IsFeature("u/e") { /* fast path */
- id, err := http_get_id(n.Host + "/u/e/" + Echo +"/-1:1")
+ id, err := http_get_id(n.Host + "/u/e/" + Echo + "/-1:1")
if err == nil && db.Lookup(id) != nil { /* no sync needed */
Info.Printf("%s: no sync needed", Echo)
return
@@ -80,7 +80,7 @@ func (n *Node)Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond
if db.Lookup(id) != nil {
break
}
- try ++
+ try++
limit *= 2
}
}
@@ -109,12 +109,12 @@ func (n *Node)Fetcher(db *DB, Echo string, limit int, wait *sync.WaitGroup, cond
var MaxConnections = 6
-func (n *Node)List() ([]string, error) {
+func (n *Node) List() ([]string, error) {
var list []string
if !n.IsFeature("list.txt") {
return list, nil
}
- if err := http_req_lines(n.Host + "/list.txt", func(line string) bool {
+ if err := http_req_lines(n.Host+"/list.txt", func(line string) bool {
list = append(list, strings.Split(line, ":")[0])
return true
}); err != nil {
@@ -123,17 +123,18 @@ func (n *Node)List() ([]string, error) {
return list, nil
}
-func (n *Node)Store(db *DB, ids []string) error {
+func (n *Node) Store(db *DB, ids []string) error {
req := ""
var nreq int
count := len(ids)
+ Trace.Printf("Get and store messages")
for i := 0; i < count; i++ {
req = req + "/" + string(ids[i])
- nreq ++
- if nreq < 8 && i < count - 1 {
+ nreq++
+ if nreq < 8 && i < count-1 {
continue
}
- if err := http_req_lines(n.Host + "/u/m" + req, func(b string) bool {
+ if err := http_req_lines(n.Host+"/u/m"+req, func(b string) bool {
m, e := DecodeBundle(b)
if e != nil {
Error.Printf("Can not decode message %s (%s)\n", b, e)
@@ -152,7 +153,7 @@ func (n *Node)Store(db *DB, ids []string) error {
return nil
}
-func (n *Node)Fetch(db *DB, Echolist []string, limit int) error {
+func (n *Node) Fetch(db *DB, Echolist []string, limit int) error {
if len(Echolist) == 0 {
Echolist, _ = n.List()
}
@@ -163,7 +164,7 @@ func (n *Node)Fetch(db *DB, Echolist []string, limit int) error {
cond := sync.NewCond(&sync.Mutex{})
num := 0
Info.Printf("Start fetcher(s) for %s", n.Host)
- for _, v := range(Echolist) {
+ for _, v := range Echolist {
wait.Add(1)
num += 1
if num >= MaxConnections { /* add per one */
@@ -183,7 +184,7 @@ func (n *Node)Fetch(db *DB, Echolist []string, limit int) error {
return nil
}
-func (n *Node)IsFeature(f string) bool {
+func (n *Node) IsFeature(f string) bool {
_, ok := n.Features[f]
return ok
}
@@ -192,7 +193,7 @@ func Connect(addr string) (*Node, error) {
var n Node
n.Host = strings.TrimSuffix(addr, "/")
n.Features = make(map[string]bool)
- if err := http_req_lines(n.Host + "/x/features", func(line string) bool {
+ if err := http_req_lines(n.Host+"/x/features", func(line string) bool {
n.Features[line] = true
Trace.Printf("%s supports %s", n.Host, line)
return true