feat: use XORM EngineGroup instead of single Engine connection (#7212)

Resolves #7207

Add new configuration to make XORM work with a main and replicas database instances. The follow configuration parameters were added:

- `HOST_PRIMARY`
- `HOST_REPLICAS`
- `LOAD_BALANCE_POLICY`. Options:
    - `"WeightRandom"` -> `xorm.WeightRandomPolicy`
    - `"WeightRoundRobin`  -> `WeightRoundRobinPolicy`
    - `"LeastCon"` -> `LeastConnPolicy`
    - `"RoundRobin"` -> `xorm.RoundRobinPolicy()`
    - default: `xorm.RandomPolicy()`
- `LOAD_BALANCE_WEIGHTS`

Co-authored-by: pat-s <patrick.schratz@gmail.com@>
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/7212
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
Co-authored-by: pat-s <patrick.schratz@gmail.com>
Co-committed-by: pat-s <patrick.schratz@gmail.com>
This commit is contained in:
pat-s 2025-03-30 11:34:02 +00:00 committed by Gusted
parent a23d0453a3
commit 63a80bf2b9
19 changed files with 463 additions and 129 deletions

View file

@ -95,34 +95,70 @@ func init() {
}
}
// newXORMEngine returns a new XORM engine from the configuration
func newXORMEngine() (*xorm.Engine, error) {
connStr, err := setting.DBConnStr()
// newXORMEngineGroup creates an xorm.EngineGroup (with one master and one or more slaves).
// It assumes you have separate master and slave DSNs defined via the settings package.
func newXORMEngineGroup() (Engine, error) {
// Retrieve master DSN from settings.
masterConnStr, err := setting.DBMasterConnStr()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to determine master DSN: %w", err)
}
var engine *xorm.Engine
var masterEngine *xorm.Engine
// For PostgreSQL: if a schema is provided, we use the special "postgresschema" driver.
if setting.Database.Type.IsPostgreSQL() && len(setting.Database.Schema) > 0 {
// OK whilst we sort out our schema issues - create a schema aware postgres
registerPostgresSchemaDriver()
engine, err = xorm.NewEngine("postgresschema", connStr)
masterEngine, err = xorm.NewEngine("postgresschema", masterConnStr)
} else {
engine, err = xorm.NewEngine(setting.Database.Type.String(), connStr)
masterEngine, err = xorm.NewEngine(setting.Database.Type.String(), masterConnStr)
}
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create master engine: %w", err)
}
if setting.Database.Type.IsMySQL() {
engine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
masterEngine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
}
engine.SetSchema(setting.Database.Schema)
return engine, nil
masterEngine.SetSchema(setting.Database.Schema)
slaveConnStrs, err := setting.DBSlaveConnStrs()
if err != nil {
return nil, fmt.Errorf("failed to load slave DSNs: %w", err)
}
var slaveEngines []*xorm.Engine
// Iterate over all slave DSNs and create engines
for _, dsn := range slaveConnStrs {
slaveEngine, err := xorm.NewEngine(setting.Database.Type.String(), dsn)
if err != nil {
return nil, fmt.Errorf("failed to create slave engine for dsn %q: %w", dsn, err)
}
if setting.Database.Type.IsMySQL() {
slaveEngine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
}
slaveEngine.SetSchema(setting.Database.Schema)
slaveEngines = append(slaveEngines, slaveEngine)
}
policy := setting.BuildLoadBalancePolicy(&setting.Database, slaveEngines)
// Create the EngineGroup using the selected policy
group, err := xorm.NewEngineGroup(masterEngine, slaveEngines, policy)
if err != nil {
return nil, fmt.Errorf("failed to create engine group: %w", err)
}
return engineGroupWrapper{group}, nil
}
// SyncAllTables sync the schemas of all tables, is required by unit test code
type engineGroupWrapper struct {
*xorm.EngineGroup
}
func (w engineGroupWrapper) AddHook(hook contexts.Hook) bool {
w.EngineGroup.AddHook(hook)
return true
}
// SyncAllTables sync the schemas of all tables
func SyncAllTables() error {
_, err := x.StoreEngine("InnoDB").SyncWithOptions(xorm.SyncOptions{
WarnIfDatabaseColumnMissed: true,
@ -130,52 +166,61 @@ func SyncAllTables() error {
return err
}
// InitEngine initializes the xorm.Engine and sets it as db.DefaultContext
// InitEngine initializes the xorm EngineGroup and sets it as db.DefaultContext
func InitEngine(ctx context.Context) error {
xormEngine, err := newXORMEngine()
xormEngine, err := newXORMEngineGroup()
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
// Try to cast to the concrete type to access diagnostic methods
if eng, ok := xormEngine.(engineGroupWrapper); ok {
eng.SetMapper(names.GonicMapper{})
// WARNING: for serv command, MUST remove the output to os.Stdout,
// so use a log file instead of printing to stdout.
eng.SetLogger(NewXORMLogger(setting.Database.LogSQL))
eng.ShowSQL(setting.Database.LogSQL)
eng.SetMaxOpenConns(setting.Database.MaxOpenConns)
eng.SetMaxIdleConns(setting.Database.MaxIdleConns)
eng.SetConnMaxLifetime(setting.Database.ConnMaxLifetime)
eng.SetConnMaxIdleTime(setting.Database.ConnMaxIdleTime)
eng.SetDefaultContext(ctx)
xormEngine.SetMapper(names.GonicMapper{})
// WARNING: for serv command, MUST remove the output to os.stdout,
// so use log file to instead print to stdout.
xormEngine.SetLogger(NewXORMLogger(setting.Database.LogSQL))
xormEngine.ShowSQL(setting.Database.LogSQL)
xormEngine.SetMaxOpenConns(setting.Database.MaxOpenConns)
xormEngine.SetMaxIdleConns(setting.Database.MaxIdleConns)
xormEngine.SetConnMaxLifetime(setting.Database.ConnMaxLifetime)
xormEngine.SetConnMaxIdleTime(setting.Database.ConnMaxIdleTime)
xormEngine.SetDefaultContext(ctx)
if setting.Database.SlowQueryThreshold > 0 {
eng.AddHook(&SlowQueryHook{
Treshold: setting.Database.SlowQueryThreshold,
Logger: log.GetLogger("xorm"),
})
}
if setting.Database.SlowQueryThreshold > 0 {
xormEngine.AddHook(&SlowQueryHook{
Treshold: setting.Database.SlowQueryThreshold,
Logger: log.GetLogger("xorm"),
errorLogger := log.GetLogger("xorm")
if setting.IsInTesting {
errorLogger = log.GetLogger(log.DEFAULT)
}
eng.AddHook(&ErrorQueryHook{
Logger: errorLogger,
})
eng.AddHook(&TracingHook{})
SetDefaultEngine(ctx, eng)
} else {
// Fallback: if type assertion fails, set default engine without extended diagnostics
SetDefaultEngine(ctx, xormEngine)
}
errorLogger := log.GetLogger("xorm")
if setting.IsInTesting {
errorLogger = log.GetLogger(log.DEFAULT)
}
xormEngine.AddHook(&ErrorQueryHook{
Logger: errorLogger,
})
xormEngine.AddHook(&TracingHook{})
SetDefaultEngine(ctx, xormEngine)
return nil
}
// SetDefaultEngine sets the default engine for db
func SetDefaultEngine(ctx context.Context, eng *xorm.Engine) {
x = eng
// SetDefaultEngine sets the default engine for db.
func SetDefaultEngine(ctx context.Context, eng Engine) {
masterEngine, err := GetMasterEngine(eng)
if err == nil {
x = masterEngine
}
DefaultContext = &Context{
Context: ctx,
e: x,
e: eng,
}
}
@ -191,12 +236,12 @@ func UnsetDefaultEngine() {
DefaultContext = nil
}
// InitEngineWithMigration initializes a new xorm.Engine and sets it as the db.DefaultContext
// InitEngineWithMigration initializes a new xorm EngineGroup, runs migrations, and sets it as db.DefaultContext
// This function must never call .Sync() if the provided migration function fails.
// When called from the "doctor" command, the migration function is a version check
// that prevents the doctor from fixing anything in the database if the migration level
// is different from the expected value.
func InitEngineWithMigration(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) {
func InitEngineWithMigration(ctx context.Context, migrateFunc func(Engine) error) (err error) {
if err = InitEngine(ctx); err != nil {
return err
}
@ -230,14 +275,14 @@ func InitEngineWithMigration(ctx context.Context, migrateFunc func(*xorm.Engine)
return nil
}
// NamesToBean return a list of beans or an error
// NamesToBean returns a list of beans given names
func NamesToBean(names ...string) ([]any, error) {
beans := []any{}
if len(names) == 0 {
beans = append(beans, tables...)
return beans, nil
}
// Need to map provided names to beans...
// Map provided names to beans
beanMap := make(map[string]any)
for _, bean := range tables {
beanMap[strings.ToLower(reflect.Indirect(reflect.ValueOf(bean)).Type().Name())] = bean
@ -259,7 +304,7 @@ func NamesToBean(names ...string) ([]any, error) {
return beans, nil
}
// DumpDatabase dumps all data from database according the special database SQL syntax to file system.
// DumpDatabase dumps all data from database using special SQL syntax to the file system.
func DumpDatabase(filePath, dbType string) error {
var tbs []*schemas.Table
for _, t := range tables {
@ -295,29 +340,33 @@ func MaxBatchInsertSize(bean any) int {
return 999 / len(t.ColumnsSeq())
}
// IsTableNotEmpty returns true if table has at least one record
// IsTableNotEmpty returns true if the table has at least one record
func IsTableNotEmpty(beanOrTableName any) (bool, error) {
return x.Table(beanOrTableName).Exist()
}
// DeleteAllRecords will delete all the records of this table
// DeleteAllRecords deletes all records in the given table.
func DeleteAllRecords(tableName string) error {
_, err := x.Exec(fmt.Sprintf("DELETE FROM %s", tableName))
return err
}
// GetMaxID will return max id of the table
// GetMaxID returns the maximum id in the table
func GetMaxID(beanOrTableName any) (maxID int64, err error) {
_, err = x.Select("MAX(id)").Table(beanOrTableName).Get(&maxID)
return maxID, err
}
func SetLogSQL(ctx context.Context, on bool) {
e := GetEngine(ctx)
if x, ok := e.(*xorm.Engine); ok {
x.ShowSQL(on)
} else if sess, ok := e.(*xorm.Session); ok {
ctxEngine := GetEngine(ctx)
if sess, ok := ctxEngine.(*xorm.Session); ok {
sess.Engine().ShowSQL(on)
} else if wrapper, ok := ctxEngine.(engineGroupWrapper); ok {
// Handle engineGroupWrapper directly
wrapper.ShowSQL(on)
} else if masterEngine, err := GetMasterEngine(ctxEngine); err == nil {
masterEngine.ShowSQL(on)
}
}
@ -374,3 +423,18 @@ func (h *ErrorQueryHook) AfterProcess(c *contexts.ContextHook) error {
}
return nil
}
// GetMasterEngine extracts the master xorm.Engine from the provided xorm.Engine.
// This handles both direct xorm.Engine cases and engines that implement a Master() method.
func GetMasterEngine(x Engine) (*xorm.Engine, error) {
if getter, ok := x.(interface{ Master() *xorm.Engine }); ok {
return getter.Master(), nil
}
engine, ok := x.(*xorm.Engine)
if !ok {
return nil, fmt.Errorf("unsupported engine type: %T", x)
}
return engine, nil
}

View file

@ -33,10 +33,11 @@ func getCurrentResourceIndex(ctx context.Context, tableName string, groupID int6
func TestSyncMaxResourceIndex(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
xe := unittest.GetXORMEngine()
xe, err := unittest.GetXORMEngine()
require.NoError(t, err)
require.NoError(t, xe.Sync(&TestIndex{}))
err := db.SyncMaxResourceIndex(db.DefaultContext, "test_index", 10, 51)
err = db.SyncMaxResourceIndex(db.DefaultContext, "test_index", 10, 51)
require.NoError(t, err)
// sync new max index
@ -88,7 +89,8 @@ func TestSyncMaxResourceIndex(t *testing.T) {
func TestGetNextResourceIndex(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
xe := unittest.GetXORMEngine()
xe, err := unittest.GetXORMEngine()
require.NoError(t, err)
require.NoError(t, xe.Sync(&TestIndex{}))
// create a new record

View file

@ -17,7 +17,8 @@ import (
func TestIterate(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
xe := unittest.GetXORMEngine()
xe, err := unittest.GetXORMEngine()
require.NoError(t, err)
require.NoError(t, xe.Sync(&repo_model.RepoUnit{}))
cnt, err := db.GetEngine(db.DefaultContext).Count(&repo_model.RepoUnit{})

View file

@ -29,11 +29,12 @@ func (opts mockListOptions) ToConds() builder.Cond {
func TestFind(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
xe := unittest.GetXORMEngine()
xe, err := unittest.GetXORMEngine()
require.NoError(t, err)
require.NoError(t, xe.Sync(&repo_model.RepoUnit{}))
var repoUnitCount int
_, err := db.GetEngine(db.DefaultContext).SQL("SELECT COUNT(*) FROM repo_unit").Get(&repoUnitCount)
_, err = db.GetEngine(db.DefaultContext).SQL("SELECT COUNT(*) FROM repo_unit").Get(&repoUnitCount)
require.NoError(t, err)
assert.NotEmpty(t, repoUnitCount)

View file

@ -8,6 +8,7 @@ import (
"context"
"fmt"
"forgejo.org/models/db"
"forgejo.org/models/forgejo_migrations"
"forgejo.org/models/migrations/v1_10"
"forgejo.org/models/migrations/v1_11"
@ -510,3 +511,12 @@ Please try upgrading to a lower version first (suggested v1.6.4), then upgrade t
// Execute Forgejo specific migrations.
return forgejo_migrations.Migrate(x)
}
// WrapperMigrate is a wrapper for Migrate to be called in diagnostics
func WrapperMigrate(e db.Engine) error {
engine, err := db.GetMasterEngine(e)
if err != nil {
return err
}
return Migrate(engine)
}

View file

@ -175,7 +175,10 @@ func newXORMEngine() (*xorm.Engine, error) {
if err := db.InitEngine(context.Background()); err != nil {
return nil, err
}
x := unittest.GetXORMEngine()
x, err := unittest.GetXORMEngine()
if err != nil {
return nil, err
}
return x, nil
}

View file

@ -22,11 +22,11 @@ import (
var fixturesLoader *testfixtures.Loader
// GetXORMEngine gets the XORM engine
func GetXORMEngine(engine ...*xorm.Engine) (x *xorm.Engine) {
func GetXORMEngine(engine ...*xorm.Engine) (x *xorm.Engine, err error) {
if len(engine) == 1 {
return engine[0]
return engine[0], nil
}
return db.DefaultContext.(*db.Context).Engine().(*xorm.Engine)
return db.GetMasterEngine(db.DefaultContext.(*db.Context).Engine())
}
func OverrideFixtures(opts FixturesOptions, engine ...*xorm.Engine) func() {
@ -41,7 +41,10 @@ func OverrideFixtures(opts FixturesOptions, engine ...*xorm.Engine) func() {
// InitFixtures initialize test fixtures for a test database
func InitFixtures(opts FixturesOptions, engine ...*xorm.Engine) (err error) {
e := GetXORMEngine(engine...)
e, err := GetXORMEngine(engine...)
if err != nil {
return err
}
var fixtureOptionFiles func(*testfixtures.Loader) error
if opts.Dir != "" {
fixtureOptionFiles = testfixtures.Directory(opts.Dir)
@ -93,10 +96,12 @@ func InitFixtures(opts FixturesOptions, engine ...*xorm.Engine) (err error) {
// LoadFixtures load fixtures for a test database
func LoadFixtures(engine ...*xorm.Engine) error {
e := GetXORMEngine(engine...)
var err error
e, err := GetXORMEngine(engine...)
if err != nil {
return err
}
// (doubt) database transaction conflicts could occur and result in ROLLBACK? just try for a few times.
for i := 0; i < 5; i++ {
for range 5 {
if err = fixturesLoader.Load(); err == nil {
break
}