more thread safety

This commit is contained in:
Niko Abeler 2022-09-10 13:44:25 +02:00
parent d66c1a6817
commit 4b9a5adf5c
5 changed files with 194 additions and 135 deletions

View File

@ -16,12 +16,12 @@
</div> </div>
<hr> <hr>
{{if .Post.ApprovedWebmentions}} {{if .Post.ApprovedIncomingWebmentions}}
<h3> <h3>
Webmentions Webmentions
</h3> </h3>
<ul> <ul>
{{range .Post.ApprovedWebmentions}} {{range .Post.ApprovedIncomingWebmentions}}
<li> <li>
<a href="{{.Source}}"> <a href="{{.Source}}">
{{if .Title}} {{if .Title}}

View File

@ -28,6 +28,27 @@ func (*MockHtmlParser) GetWebmentionEndpoint(resp *http.Response) (string, error
} }
type MockParseLinksHtmlParser struct {
Links []string
}
func (*MockParseLinksHtmlParser) ParseHEntry(resp *http.Response) (owl.ParsedHEntry, error) {
return owl.ParsedHEntry{Title: "Mock Title"}, nil
}
func (parser *MockParseLinksHtmlParser) ParseLinks(resp *http.Response) ([]string, error) {
return parser.Links, nil
}
func (parser *MockParseLinksHtmlParser) ParseLinksFromString(string) ([]string, error) {
return parser.Links, nil
}
func (*MockParseLinksHtmlParser) GetWebmentionEndpoint(resp *http.Response) (string, error) {
return "http://example.com/webmention", nil
}
type MockHttpClient struct{} type MockHttpClient struct{}
func (*MockHttpClient) Get(url string) (resp *http.Response, err error) { func (*MockHttpClient) Get(url string) (resp *http.Response, err error) {

176
post.go
View File

@ -88,42 +88,6 @@ func (post *Post) Content() []byte {
return data return data
} }
func (post *Post) Webmentions() PostWebmetions {
// read status file
// return parsed webmentions
fileName := post.WebmentionsFile()
if !fileExists(fileName) {
return PostWebmetions{}
}
data, err := os.ReadFile(fileName)
if err != nil {
return PostWebmetions{}
}
webmentions := PostWebmetions{}
err = yaml.Unmarshal(data, &webmentions)
if err != nil {
return PostWebmetions{}
}
return webmentions
}
func (post *Post) PersistIncomingWebmentions(webmentions PostWebmetions) error {
data, err := yaml.Marshal(webmentions)
if err != nil {
return err
}
err = os.WriteFile(post.WebmentionsFile(), data, 0644)
if err != nil {
return err
}
return nil
}
func (post *Post) RenderedContent() bytes.Buffer { func (post *Post) RenderedContent() bytes.Buffer {
data := post.Content() data := post.Content()
@ -192,6 +156,51 @@ func (post *Post) LoadMeta() error {
return nil return nil
} }
// Webmentions returns list of incoming and outgoing webmentions
func (post *Post) Webmentions() PostWebmetions {
// read status file
// return parsed webmentions
fileName := post.WebmentionsFile()
if !fileExists(fileName) {
return PostWebmetions{}
}
data, err := os.ReadFile(fileName)
if err != nil {
return PostWebmetions{}
}
webmentions := PostWebmetions{}
err = yaml.Unmarshal(data, &webmentions)
if err != nil {
return PostWebmetions{}
}
return webmentions
}
func (post *Post) IncomingWebmentions() []WebmentionIn {
return post.Webmentions().Incoming
}
func (post *Post) OutgoingWebmentions() []WebmentionOut {
return post.Webmentions().Outgoing
}
func (post *Post) persistWebmentions(webmentions PostWebmetions) error {
data, err := yaml.Marshal(webmentions)
if err != nil {
return err
}
err = os.WriteFile(post.WebmentionsFile(), data, 0644)
if err != nil {
return err
}
return nil
}
func (post *Post) PersistIncomingWebmention(webmention WebmentionIn) error { func (post *Post) PersistIncomingWebmention(webmention WebmentionIn) error {
wms := post.Webmentions() wms := post.Webmentions()
@ -209,10 +218,39 @@ func (post *Post) PersistIncomingWebmention(webmention WebmentionIn) error {
wms.Incoming = append(wms.Incoming, webmention) wms.Incoming = append(wms.Incoming, webmention)
} }
return post.PersistIncomingWebmentions(wms) return post.persistWebmentions(wms)
} }
func (post *Post) getIncomingWebmention(source string) (WebmentionIn, error) { // PersistOutgoingWebmention persists a webmention to the webmention file.
// If `newLink` is true, the webmention is only persisted if it is not already in the webmention file.
func (post *Post) PersistOutgoingWebmention(webmention *WebmentionOut, newLink bool) error {
post.wmLock.Lock()
defer post.wmLock.Unlock()
wms := post.Webmentions()
// if target is not in webmention, add it
replaced := false
for i, t := range wms.Outgoing {
if t.Target == webmention.Target {
// if newLink is true, only replace if the link is new
if newLink {
return nil
}
wms.Outgoing[i] = *webmention
replaced = true
break
}
}
if !replaced {
wms.Outgoing = append(wms.Outgoing, *webmention)
}
return post.persistWebmentions(wms)
}
func (post *Post) getIncomingWebmentionBySource(source string) (WebmentionIn, error) {
wms := post.Webmentions() wms := post.Webmentions()
for _, wm := range wms.Incoming { for _, wm := range wms.Incoming {
if wm.Source == source { if wm.Source == source {
@ -227,7 +265,7 @@ func (post *Post) AddIncomingWebmention(source string) error {
defer post.wmLock.Unlock() defer post.wmLock.Unlock()
// Check if file already exists // Check if file already exists
_, err := post.getIncomingWebmention(source) _, err := post.getIncomingWebmentionBySource(source)
if err != nil { if err != nil {
wms := post.Webmentions() wms := post.Webmentions()
wms.Incoming = append(wms.Incoming, WebmentionIn{ wms.Incoming = append(wms.Incoming, WebmentionIn{
@ -236,55 +274,18 @@ func (post *Post) AddIncomingWebmention(source string) error {
defer func() { defer func() {
go post.EnrichWebmention(source) go post.EnrichWebmention(source)
}() }()
return post.PersistIncomingWebmentions(wms) return post.persistWebmentions(wms)
} }
return nil return nil
} }
func (post *Post) addOutgoingWebmention(target string) error {
wms := post.Webmentions()
// Check if file already exists
for _, wm := range wms.Outgoing {
if wm.Target == target {
return nil
}
}
webmention := WebmentionOut{
Target: target,
}
wms.Outgoing = append(wms.Outgoing, webmention)
return post.PersistIncomingWebmentions(wms)
}
func (post *Post) UpdateOutgoingWebmention(webmention *WebmentionOut) error {
wms := post.Webmentions()
// if target is not in status, add it
replaced := false
for i, t := range wms.Outgoing {
if t.Target == webmention.Target {
wms.Outgoing[i] = *webmention
replaced = true
break
}
}
if !replaced {
wms.Outgoing = append(wms.Outgoing, *webmention)
}
return post.PersistIncomingWebmentions(wms)
}
func (post *Post) EnrichWebmention(source string) error { func (post *Post) EnrichWebmention(source string) error {
post.wmLock.Lock() post.wmLock.Lock()
defer post.wmLock.Unlock() defer post.wmLock.Unlock()
resp, err := post.user.repo.HttpClient.Get(source) resp, err := post.user.repo.HttpClient.Get(source)
if err == nil { if err == nil {
webmention, err := post.getIncomingWebmention(source) webmention, err := post.getIncomingWebmentionBySource(source)
if err != nil { if err != nil {
return err return err
} }
@ -297,11 +298,7 @@ func (post *Post) EnrichWebmention(source string) error {
return err return err
} }
func (post *Post) IncomingWebmentions() []WebmentionIn { func (post *Post) ApprovedIncomingWebmentions() []WebmentionIn {
return post.Webmentions().Incoming
}
func (post *Post) ApprovedWebmentions() []WebmentionIn {
webmentions := post.IncomingWebmentions() webmentions := post.IncomingWebmentions()
approved := []WebmentionIn{} approved := []WebmentionIn{}
for _, webmention := range webmentions { for _, webmention := range webmentions {
@ -317,10 +314,6 @@ func (post *Post) ApprovedWebmentions() []WebmentionIn {
return approved return approved
} }
func (post *Post) OutgoingWebmentions() []WebmentionOut {
return post.Webmentions().Outgoing
}
// ScanForLinks scans the post content for links and adds them to the // ScanForLinks scans the post content for links and adds them to the
// `status.yml` file for the post. The links are not scanned by this function. // `status.yml` file for the post. The links are not scanned by this function.
func (post *Post) ScanForLinks() error { func (post *Post) ScanForLinks() error {
@ -329,16 +322,15 @@ func (post *Post) ScanForLinks() error {
postHtml := post.RenderedContent() postHtml := post.RenderedContent()
links, _ := post.user.repo.Parser.ParseLinksFromString(postHtml.String()) links, _ := post.user.repo.Parser.ParseLinksFromString(postHtml.String())
for _, link := range links { for _, link := range links {
post.addOutgoingWebmention(link) post.PersistOutgoingWebmention(&WebmentionOut{
Target: link,
}, true)
} }
return nil return nil
} }
func (post *Post) SendWebmention(webmention WebmentionOut) error { func (post *Post) SendWebmention(webmention WebmentionOut) error {
post.wmLock.Lock() defer post.PersistOutgoingWebmention(&webmention, false)
defer post.wmLock.Unlock()
defer post.UpdateOutgoingWebmention(&webmention)
webmention.ScannedAt = time.Now() webmention.ScannedAt = time.Now()
resp, err := post.user.repo.HttpClient.Get(webmention.Target) resp, err := post.user.repo.HttpClient.Get(webmention.Target)

View File

@ -252,7 +252,7 @@ func TestEnrichAddsTitle(t *testing.T) {
} }
} }
func TestApprovedWebmentions(t *testing.T) { func TestApprovedIncomingWebmentions(t *testing.T) {
repo := getTestRepo(owl.RepoConfig{}) repo := getTestRepo(owl.RepoConfig{})
user, _ := repo.CreateUser("testuser") user, _ := repo.CreateUser("testuser")
post, _ := user.CreateNewPost("testpost") post, _ := user.CreateNewPost("testpost")
@ -281,7 +281,7 @@ func TestApprovedWebmentions(t *testing.T) {
} }
post.PersistIncomingWebmention(webmention) post.PersistIncomingWebmention(webmention)
webmentions := post.ApprovedWebmentions() webmentions := post.ApprovedIncomingWebmentions()
if len(webmentions) != 2 { if len(webmentions) != 2 {
t.Errorf("Expected 2 webmentions, got %d", len(webmentions)) t.Errorf("Expected 2 webmentions, got %d", len(webmentions))
} }
@ -446,9 +446,6 @@ func TestSendingAndReceivingMultipleWebmentions(t *testing.T) {
post.AddIncomingWebmention("http://example.com" + strconv.Itoa(k)) post.AddIncomingWebmention("http://example.com" + strconv.Itoa(k))
wg.Done() wg.Done()
}(i) }(i)
}
for i := 0; i < 20; i++ {
go func(k int) { go func(k int) {
webmention := owl.WebmentionOut{ webmention := owl.WebmentionOut{
Target: "http://example.com" + strconv.Itoa(k), Target: "http://example.com" + strconv.Itoa(k),
@ -472,3 +469,52 @@ func TestSendingAndReceivingMultipleWebmentions(t *testing.T) {
t.Errorf("Expected 20 webmentions, got %d", len(outs)) t.Errorf("Expected 20 webmentions, got %d", len(outs))
} }
} }
func TestComplexParallelWebmentions(t *testing.T) {
repo := getTestRepo(owl.RepoConfig{})
repo.HttpClient = &MockHttpClient{}
repo.Parser = &MockParseLinksHtmlParser{
Links: []string{
"http://example.com/1",
"http://example.com/2",
"http://example.com/3",
},
}
user, _ := repo.CreateUser("testuser")
post, _ := user.CreateNewPost("testpost")
wg := sync.WaitGroup{}
wg.Add(60)
for i := 0; i < 20; i++ {
go func(k int) {
post.AddIncomingWebmention("http://example.com/" + strconv.Itoa(k))
wg.Done()
}(i)
go func(k int) {
webmention := owl.WebmentionOut{
Target: "http://example.com/" + strconv.Itoa(k),
}
post.SendWebmention(webmention)
wg.Done()
}(i)
go func() {
post.ScanForLinks()
wg.Done()
}()
}
wg.Wait()
ins := post.IncomingWebmentions()
if len(ins) != 20 {
t.Errorf("Expected 20 webmentions, got %d", len(ins))
}
outs := post.OutgoingWebmentions()
if len(outs) != 20 {
t.Errorf("Expected 20 webmentions, got %d", len(outs))
}
}

View File

@ -148,42 +148,42 @@ func TestGetWebmentionEndpointRelativeLinkInHeader(t *testing.T) {
} }
} }
func TestRealWorldWebmention(t *testing.T) { // func TestRealWorldWebmention(t *testing.T) {
links := []string{ // links := []string{
"https://webmention.rocks/test/1", // "https://webmention.rocks/test/1",
"https://webmention.rocks/test/2", // "https://webmention.rocks/test/2",
"https://webmention.rocks/test/3", // "https://webmention.rocks/test/3",
"https://webmention.rocks/test/4", // "https://webmention.rocks/test/4",
"https://webmention.rocks/test/5", // "https://webmention.rocks/test/5",
"https://webmention.rocks/test/6", // "https://webmention.rocks/test/6",
"https://webmention.rocks/test/7", // "https://webmention.rocks/test/7",
"https://webmention.rocks/test/8", // "https://webmention.rocks/test/8",
"https://webmention.rocks/test/9", // "https://webmention.rocks/test/9",
// "https://webmention.rocks/test/10", // not supported // // "https://webmention.rocks/test/10", // not supported
"https://webmention.rocks/test/11", // "https://webmention.rocks/test/11",
"https://webmention.rocks/test/12", // "https://webmention.rocks/test/12",
"https://webmention.rocks/test/13", // "https://webmention.rocks/test/13",
"https://webmention.rocks/test/14", // "https://webmention.rocks/test/14",
"https://webmention.rocks/test/15", // "https://webmention.rocks/test/15",
"https://webmention.rocks/test/16", // "https://webmention.rocks/test/16",
"https://webmention.rocks/test/17", // "https://webmention.rocks/test/17",
"https://webmention.rocks/test/18", // "https://webmention.rocks/test/18",
"https://webmention.rocks/test/19", // "https://webmention.rocks/test/19",
"https://webmention.rocks/test/20", // "https://webmention.rocks/test/20",
"https://webmention.rocks/test/21", // "https://webmention.rocks/test/21",
"https://webmention.rocks/test/22", // "https://webmention.rocks/test/22",
"https://webmention.rocks/test/23/page", // "https://webmention.rocks/test/23/page",
} // }
for _, link := range links { // for _, link := range links {
parser := &owl.OwlHtmlParser{} // parser := &owl.OwlHtmlParser{}
client := &owl.OwlHttpClient{} // client := &owl.OwlHttpClient{}
html, _ := client.Get(link) // html, _ := client.Get(link)
_, err := parser.GetWebmentionEndpoint(html) // _, err := parser.GetWebmentionEndpoint(html)
if err != nil { // if err != nil {
t.Errorf("Unable to find webmention: %v for link %v", err, link) // t.Errorf("Unable to find webmention: %v for link %v", err, link)
} // }
} // }
} // }