mirror of
https://gitea.com/gitea/act_runner
synced 2026-05-01 01:27:56 +02:00
Clone different git repos in parallel via per-directory locks (#866)
Old `cloneLock` is a package-level `sync.Mutex` that serialized every action clone across all goroutines, regardless of target directory. This PR replaces it with a `sync.Map` of per-directory mutexes keyed by `input.Dir`. Same-directory operations still serialize; different directories now clone in parallel. Reviewed-on: https://gitea.com/gitea/act_runner/pulls/866 Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com> Co-authored-by: Zettat123 <zettat123@gmail.com> Co-committed-by: Zettat123 <zettat123@gmail.com>
This commit is contained in:
@@ -32,12 +32,21 @@ var (
|
|||||||
githubHTTPRegex = regexp.MustCompile(`^https?://.*github.com.*/(.+)/(.+?)(?:.git)?$`)
|
githubHTTPRegex = regexp.MustCompile(`^https?://.*github.com.*/(.+)/(.+?)(?:.git)?$`)
|
||||||
githubSSHRegex = regexp.MustCompile(`github.com[:/](.+)/(.+?)(?:.git)?$`)
|
githubSSHRegex = regexp.MustCompile(`github.com[:/](.+)/(.+?)(?:.git)?$`)
|
||||||
|
|
||||||
cloneLock sync.Mutex
|
cloneLocks sync.Map // key: clone target directory; value: *sync.Mutex
|
||||||
|
|
||||||
ErrShortRef = errors.New("short SHA references are not supported")
|
ErrShortRef = errors.New("short SHA references are not supported")
|
||||||
ErrNoRepo = errors.New("unable to find git repo")
|
ErrNoRepo = errors.New("unable to find git repo")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// acquireCloneLock returns an unlock function after locking the per-directory mutex for dir.
|
||||||
|
// Only concurrent operations targeting the same directory are erialized; clones into different directories run in parallel.
|
||||||
|
func acquireCloneLock(dir string) func() {
|
||||||
|
v, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{})
|
||||||
|
mu := v.(*sync.Mutex)
|
||||||
|
mu.Lock()
|
||||||
|
return mu.Unlock
|
||||||
|
}
|
||||||
|
|
||||||
type Error struct {
|
type Error struct {
|
||||||
err error
|
err error
|
||||||
commit string
|
commit string
|
||||||
@@ -301,8 +310,7 @@ func NewGitCloneExecutor(input NewGitCloneExecutorInput) common.Executor {
|
|||||||
logger.Infof(" \u2601 git clone '%s' # ref=%s", input.URL, input.Ref)
|
logger.Infof(" \u2601 git clone '%s' # ref=%s", input.URL, input.Ref)
|
||||||
logger.Debugf(" cloning %s to %s", input.URL, input.Dir)
|
logger.Debugf(" cloning %s to %s", input.URL, input.Dir)
|
||||||
|
|
||||||
cloneLock.Lock()
|
defer acquireCloneLock(input.Dir)()
|
||||||
defer cloneLock.Unlock()
|
|
||||||
|
|
||||||
refName := plumbing.ReferenceName("refs/heads/" + input.Ref)
|
refName := plumbing.ReferenceName("refs/heads/" + input.Ref)
|
||||||
r, err := CloneIfRequired(ctx, refName, input, logger)
|
r, err := CloneIfRequired(ctx, refName, input, logger)
|
||||||
|
|||||||
@@ -11,8 +11,10 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -303,3 +305,61 @@ func gitCmd(args ...string) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAcquireCloneLock(t *testing.T) {
|
||||||
|
t.Run("same directory serializes", func(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
unlock1 := acquireCloneLock(dir)
|
||||||
|
|
||||||
|
secondAcquired := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
unlock := acquireCloneLock(dir)
|
||||||
|
close(secondAcquired)
|
||||||
|
unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-secondAcquired:
|
||||||
|
t.Fatal("second acquire should block while first holds the lock")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
unlock1()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-secondAcquired:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("second acquire should proceed after first releases the lock")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("different directories do not block", func(t *testing.T) {
|
||||||
|
dirA := t.TempDir()
|
||||||
|
dirB := t.TempDir()
|
||||||
|
|
||||||
|
unlockA := acquireCloneLock(dirA)
|
||||||
|
defer unlockA()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
unlock := acquireCloneLock(dirB)
|
||||||
|
unlock()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("acquire on a different directory must not block")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("same directory reuses the same mutex", func(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
v1, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{})
|
||||||
|
v2, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{})
|
||||||
|
require.Same(t, v1, v2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user