From c6ecf7037794f509b1187bb364248c42db30fb2b Mon Sep 17 00:00:00 2001 From: Ross Hammermeister Date: Fri, 12 Jun 2020 12:09:53 -0600 Subject: [PATCH] Adding a --concurrency (-C) flag --- CHANGELOG.md | 6 +++++ cmd/task/task.go | 23 ++++++++++-------- concurrency.go | 25 ++++++++++++++++++++ task.go | 39 +++++++++++++++++++++---------- task_test.go | 16 +++++++++++++ testdata/concurrency/Taskfile.yml | 32 +++++++++++++++++++++++++ 6 files changed, 119 insertions(+), 22 deletions(-) create mode 100644 concurrency.go create mode 100644 testdata/concurrency/Taskfile.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index e0df5ae4..3821c354 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +- Add a `--concurrency` (alias `-C`) flag, to limit the number of tasks that + run concurrently. This is useful for heavy workloads. + ([#345](https://github.com/go-task/task/pull/345)). + ## v3.2.2 - 2021-01-12 - Improve performance of `--list` and `--summary` by skipping running shell diff --git a/cmd/task/task.go b/cmd/task/task.go index a5460280..936d7fc6 100644 --- a/cmd/task/task.go +++ b/cmd/task/task.go @@ -65,6 +65,7 @@ func main() { dry bool summary bool parallel bool + concurrency int dir string entrypoint string output string @@ -87,6 +88,7 @@ func main() { pflag.StringVarP(&entrypoint, "taskfile", "t", "", `choose which Taskfile to run. Defaults to "Taskfile.yml"`) pflag.StringVarP(&output, "output", "o", "", "sets output style: [interleaved|group|prefixed]") pflag.BoolVarP(&color, "color", "c", true, "colored output") + pflag.IntVarP(&concurrency, "concurrency", "C", 0, "limit number tasks to run concurrently") pflag.Parse() if versionFlag { @@ -122,16 +124,17 @@ func main() { } e := task.Executor{ - Force: force, - Watch: watch, - Verbose: verbose, - Silent: silent, - Dir: dir, - Dry: dry, - Entrypoint: entrypoint, - Summary: summary, - Parallel: parallel, - Color: color, + Force: force, + Watch: watch, + Verbose: verbose, + Silent: silent, + Dir: dir, + Dry: dry, + Entrypoint: entrypoint, + Summary: summary, + Parallel: parallel, + Color: color, + Concurrency: concurrency, Stdin: os.Stdin, Stdout: os.Stdout, diff --git a/concurrency.go b/concurrency.go new file mode 100644 index 00000000..14a3704e --- /dev/null +++ b/concurrency.go @@ -0,0 +1,25 @@ +package task + +func (e *Executor) acquireConcurrencyLimit() func() { + if e.concurrencySemaphore == nil { + return emptyFunc + } + + e.concurrencySemaphore <- struct{}{} + return func() { + <-e.concurrencySemaphore + } +} + +func (e *Executor) releaseConcurrencyLimit() func() { + if e.concurrencySemaphore == nil { + return emptyFunc + } + + <-e.concurrencySemaphore + return func() { + e.concurrencySemaphore <- struct{}{} + } +} + +func emptyFunc() {} diff --git a/task.go b/task.go index 5fb70f9a..77c901cd 100644 --- a/task.go +++ b/task.go @@ -32,16 +32,17 @@ const ( type Executor struct { Taskfile *taskfile.Taskfile - Dir string - Entrypoint string - Force bool - Watch bool - Verbose bool - Silent bool - Dry bool - Summary bool - Parallel bool - Color bool + Dir string + Entrypoint string + Force bool + Watch bool + Verbose bool + Silent bool + Dry bool + Summary bool + Parallel bool + Color bool + Concurrency int Stdin io.Reader Stdout io.Writer @@ -54,8 +55,9 @@ type Executor struct { taskvars *taskfile.Vars - taskCallCount map[string]*int32 - mkdirMutexMap map[string]*sync.Mutex + concurrencySemaphore chan struct{} + taskCallCount map[string]*int32 + mkdirMutexMap map[string]*sync.Mutex } // Run runs Task @@ -247,6 +249,10 @@ func (e *Executor) Setup() error { e.taskCallCount[k] = new(int32) e.mkdirMutexMap[k] = &sync.Mutex{} } + + if e.Concurrency > 0 { + e.concurrencySemaphore = make(chan struct{}, e.Concurrency) + } return nil } @@ -260,6 +266,9 @@ func (e *Executor) RunTask(ctx context.Context, call taskfile.Call) error { return &MaximumTaskCallExceededError{task: call.Task} } + release := e.acquireConcurrencyLimit() + defer release() + if err := e.runDeps(ctx, t); err != nil { return err } @@ -324,6 +333,9 @@ func (e *Executor) mkdir(t *taskfile.Task) error { func (e *Executor) runDeps(ctx context.Context, t *taskfile.Task) error { g, ctx := errgroup.WithContext(ctx) + reacquire := e.releaseConcurrencyLimit() + defer reacquire() + for _, d := range t.Deps { d := d @@ -344,6 +356,9 @@ func (e *Executor) runCommand(ctx context.Context, t *taskfile.Task, call taskfi switch { case cmd.Task != "": + reacquire := e.releaseConcurrencyLimit() + defer reacquire() + err := e.RunTask(ctx, taskfile.Call{Task: cmd.Task, Vars: cmd.Vars}) if err != nil { return err diff --git a/task_test.go b/task_test.go index e2369b20..4c16d49c 100644 --- a/task_test.go +++ b/task_test.go @@ -171,6 +171,22 @@ func TestVarsInvalidTmpl(t *testing.T) { assert.EqualError(t, e.Run(context.Background(), taskfile.Call{Task: target}), expectError, "e.Run(target)") } +func TestConcurrency(t *testing.T) { + const ( + dir = "testdata/concurrency" + target = "default" + ) + + e := &task.Executor{ + Dir: dir, + Stdout: ioutil.Discard, + Stderr: ioutil.Discard, + Concurrency: 1, + } + assert.NoError(t, e.Setup(), "e.Setup()") + assert.NoError(t, e.Run(context.Background(), taskfile.Call{Task: target}), "e.Run(target)") +} + func TestParams(t *testing.T) { tt := fileContentTest{ Dir: "testdata/params", diff --git a/testdata/concurrency/Taskfile.yml b/testdata/concurrency/Taskfile.yml new file mode 100644 index 00000000..6a6e6c4f --- /dev/null +++ b/testdata/concurrency/Taskfile.yml @@ -0,0 +1,32 @@ +version: '2' + +tasks: + default: + deps: + - t1 + + t1: + deps: + - t3 + - t4 + cmds: + - task: t2 + - echo done 1 + t2: + deps: + - t5 + - t6 + cmds: + - echo done 2 + t3: + cmds: + - echo done 3 + t4: + cmds: + - echo done 4 + t5: + cmds: + - echo done 5 + t6: + cmds: + - echo done 6