fix(actions): retry workflow insertion on database deadlock #221
@@ -5,6 +5,7 @@ package db
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.mokoconsulting.tech/MokoConsulting/MokoGitea/modules/util"
|
||||
)
|
||||
@@ -72,3 +73,27 @@ func (err ErrNotExist) Error() string {
|
||||
func (err ErrNotExist) Unwrap() error {
|
||||
return util.ErrNotExist
|
||||
}
|
||||
|
||||
// IsErrDeadlock checks whether err is a database deadlock.
|
||||
// MySQL returns error 1213 (ER_LOCK_DEADLOCK / SQLSTATE 40001).
|
||||
// PostgreSQL returns SQLSTATE 40P01 with "deadlock detected".
|
||||
// SQLite returns SQLITE_BUSY (error 5) with "database is locked".
|
||||
func IsErrDeadlock(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg := err.Error()
|
||||
// MySQL / MariaDB: "Error 1213 (40001): Deadlock found when trying to get lock"
|
||||
if strings.Contains(msg, "Error 1213") || strings.Contains(msg, "40001") {
|
||||
return true
|
||||
}
|
||||
// PostgreSQL: "deadlock detected"
|
||||
if strings.Contains(msg, "deadlock detected") {
|
||||
return true
|
||||
}
|
||||
// SQLite: "database is locked"
|
||||
if strings.Contains(msg, "database is locked") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIsErrDeadlock(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{name: "nil", err: nil, want: false},
|
||||
{name: "unrelated", err: errors.New("connection refused"), want: false},
|
||||
{name: "mysql 1213", err: errors.New("Error 1213 (40001): Deadlock found when trying to get lock; try restarting transaction"), want: true},
|
||||
{name: "mysql sqlstate", err: errors.New("SQLSTATE 40001: serialization failure"), want: true},
|
||||
{name: "postgres", err: errors.New("pq: deadlock detected"), want: true},
|
||||
{name: "sqlite", err: errors.New("database is locked"), want: true},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
assert.Equal(t, tt.want, IsErrDeadlock(tt.err))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
actions_model "git.mokoconsulting.tech/MokoConsulting/MokoGitea/models/actions"
|
||||
"git.mokoconsulting.tech/MokoConsulting/MokoGitea/models/db"
|
||||
@@ -344,7 +345,7 @@ func handleWorkflows(
|
||||
|
||||
run.NeedApproval = need
|
||||
|
||||
if err := PrepareRunAndInsert(ctx, dwf.Content, run, nil); err != nil {
|
||||
if err := prepareRunAndInsertWithRetry(ctx, dwf.Content, run); err != nil {
|
||||
log.Error("PrepareRunAndInsert: %v", err)
|
||||
continue
|
||||
}
|
||||
@@ -352,6 +353,54 @@ func handleWorkflows(
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepareRunAndInsertWithRetry wraps PrepareRunAndInsert with retries on
|
||||
// database deadlocks. When multiple workflow runs are inserted for the same
|
||||
// event (e.g. several workflows triggered by a single pull_request), each
|
||||
// InsertRun transaction acquires an X-lock on the repository row (via
|
||||
// UpdateRepoRunsNumbers) and an index lock on action_run. Two concurrent
|
||||
// transactions can deadlock when each holds one lock and waits for the other.
|
||||
// InnoDB resolves this by killing the lighter transaction, but handleWorkflows
|
||||
// only logged the error and moved on — silently dropping the workflow run.
|
||||
// Retrying the insert is safe because the rolled-back transaction left no
|
||||
// partial state.
|
||||
func prepareRunAndInsertWithRetry(ctx context.Context, content []byte, run *actions_model.ActionRun) error {
|
||||
const maxRetries = 3
|
||||
backoff := 50 * time.Millisecond
|
||||
|
||||
// Save original values that InsertRun mutates inside its transaction.
|
||||
// On deadlock rollback these become stale and must be reset before retry.
|
||||
origTitle := run.Title
|
||||
|
||||
var err error
|
||||
for attempt := range maxRetries {
|
||||
if err = PrepareRunAndInsert(ctx, content, run, nil); err == nil {
|
||||
return nil
|
||||
}
|
||||
if !db.IsErrDeadlock(err) {
|
||||
return err
|
||||
}
|
||||
log.Warn("PrepareRunAndInsert deadlock (attempt %d/%d) for workflow %s in repo %d, retrying: %v",
|
||||
attempt+1, maxRetries, run.WorkflowID, run.RepoID, err)
|
||||
|
||||
// Reset fields that InsertRun sets inside the (now rolled-back) transaction
|
||||
// so the next attempt starts clean.
|
||||
run.ID = 0
|
||||
run.Index = 0
|
||||
run.Status = actions_model.StatusWaiting
|
||||
run.Title = origTitle
|
||||
run.ConcurrencyGroup = ""
|
||||
run.ConcurrencyCancel = false
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
backoff *= 2
|
||||
}
|
||||
return fmt.Errorf("deadlock persisted after %d retries: %w", maxRetries, err)
|
||||
}
|
||||
|
||||
func newNotifyInputFromIssue(issue *issues_model.Issue, event webhook_module.HookEventType) *notifyInput {
|
||||
return newNotifyInput(issue.Repo, issue.Poster, event)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user