forkjo/modules/queue/base_redis_test.go

72 lines
1.7 KiB
Go
Raw Normal View History

Rewrite queue (#24505) # ⚠️ Breaking Many deprecated queue config options are removed (actually, they should have been removed in 1.18/1.19). If you see the fatal message when starting Gitea: "Please update your app.ini to remove deprecated config options", please follow the error messages to remove these options from your app.ini. Example: ``` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options ``` Many options in `[queue]` are are dropped, including: `WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`, `BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed from app.ini. # The problem The old queue package has some legacy problems: * complexity: I doubt few people could tell how it works. * maintainability: Too many channels and mutex/cond are mixed together, too many different structs/interfaces depends each other. * stability: due to the complexity & maintainability, sometimes there are strange bugs and difficult to debug, and some code doesn't have test (indeed some code is difficult to test because a lot of things are mixed together). * general applicability: although it is called "queue", its behavior is not a well-known queue. * scalability: it doesn't seem easy to make it work with a cluster without breaking its behaviors. It came from some very old code to "avoid breaking", however, its technical debt is too heavy now. It's a good time to introduce a better "queue" package. # The new queue package It keeps using old config and concept as much as possible. * It only contains two major kinds of concepts: * The "base queue": channel, levelqueue, redis * They have the same abstraction, the same interface, and they are tested by the same testing code. * The "WokerPoolQueue", it uses the "base queue" to provide "worker pool" function, calls the "handler" to process the data in the base queue. * The new code doesn't do "PushBack" * Think about a queue with many workers, the "PushBack" can't guarantee the order for re-queued unhandled items, so in new code it just does "normal push" * The new code doesn't do "pause/resume" * The "pause/resume" was designed to handle some handler's failure: eg: document indexer (elasticsearch) is down * If a queue is paused for long time, either the producers blocks or the new items are dropped. * The new code doesn't do such "pause/resume" trick, it's not a common queue's behavior and it doesn't help much. * If there are unhandled items, the "push" function just blocks for a few seconds and then re-queue them and retry. * The new code doesn't do "worker booster" * Gitea's queue's handlers are light functions, the cost is only the go-routine, so it doesn't make sense to "boost" them. * The new code only use "max worker number" to limit the concurrent workers. * The new "Push" never blocks forever * Instead of creating more and more blocking goroutines, return an error is more friendly to the server and to the end user. There are more details in code comments: eg: the "Flush" problem, the strange "code.index" hanging problem, the "immediate" queue problem. Almost ready for review. TODO: * [x] add some necessary comments during review * [x] add some more tests if necessary * [x] update documents and config options * [x] test max worker / active worker * [x] re-run the CI tasks to see whether any test is flaky * [x] improve the `handleOldLengthConfiguration` to provide more friendly messages * [x] fine tune default config values (eg: length?) ## Code coverage: ![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
2023-05-08 13:49:59 +02:00
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package queue
import (
"context"
"os"
"os/exec"
"testing"
"time"
"code.gitea.io/gitea/modules/nosql"
"code.gitea.io/gitea/modules/setting"
"github.com/stretchr/testify/assert"
)
func waitRedisReady(conn string, dur time.Duration) (ready bool) {
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
if ret.Err() == nil {
return true
}
if time.Since(t) > dur {
return false
}
}
}
func redisServerCmd(t *testing.T) *exec.Cmd {
redisServerProg, err := exec.LookPath("redis-server")
if err != nil {
return nil
}
c := &exec.Cmd{
Path: redisServerProg,
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
Dir: t.TempDir(),
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
return c
}
func TestBaseRedis(t *testing.T) {
var redisServer *exec.Cmd
defer func() {
if redisServer != nil {
_ = redisServer.Process.Signal(os.Interrupt)
_ = redisServer.Wait()
}
}()
if !waitRedisReady("redis://127.0.0.1:6379/0", 0) {
redisServer = redisServerCmd(t)
[CI] disable redis test, no redis server yet in CI (cherry picked from commit e1bbfa36197ebab97954e8195f7d36adf7c85d56) (cherry picked from commit 91245ca9179a46047a351247dacecdace557111d) (cherry picked from commit 705d0558be2c90d06e9e5b883044fd0b275b1113) (cherry picked from commit 9247594970c9db109e3e6ca3fd87485450df921c) (cherry picked from commit 9db1158a487e00e588810459fe402cc2ccea43f7) (cherry picked from commit 3b36b77d87a90fbea03fc16638657e19328ccedc) (cherry picked from commit 162fa1d8ae3753dd8ee51698555e495f2c63d925) (cherry picked from commit d03d0afbb565c8bc8b723e10c8c70b69e7af7b80) (cherry picked from commit 7b8f92f7871b838bc2eefa34e7dc48bcd141d1d5) (cherry picked from commit 035abca9691d33e319062325dae402da66683c43) (cherry picked from commit a8fbf6bb56046665cb2cde0ffcc753f56b2f0f2d) (cherry picked from commit 3be681d037b07880236cae1aa70245e5eb4d1497) (cherry picked from commit 7e5d471c832ee3fea378ecc97835b038bd55a8e1) (cherry picked from commit 323801d935fec2c6d460192b62fa12b5204da76e) (cherry picked from commit 3fdfe4bfea623111f1f97e50b71b98a63c8b38e7) (cherry picked from commit 58a07421a4508ca298c1c3a45d33d49737ee98d8) (cherry picked from commit dbb71a4c8502b640857d3500dda12ab4b5d74b29) (cherry picked from commit d442113520d21149e155d1e62bbeb6a35a6aec08) (cherry picked from commit d3329f01f8c7145c422b159509f544ec83604a51) (cherry picked from commit 069a1d68b856898e2913d1d4456deb7f1e976a6c) (cherry picked from commit 14919e609a4dd9ae9ca19880ffc459def8bea273) (cherry picked from commit 2b139fed20bc2f78cd0edf35f3d226ad055f60b5) (cherry picked from commit 57ee24202472a60c373909dfea9158d057e9cebf) (cherry picked from commit b46762432f0bdb0c017e752e53a7281c0945af60)
2023-05-16 07:58:42 +02:00
if true {
t.Skip("redis-server not found in Forgejo test yet")
Rewrite queue (#24505) # ⚠️ Breaking Many deprecated queue config options are removed (actually, they should have been removed in 1.18/1.19). If you see the fatal message when starting Gitea: "Please update your app.ini to remove deprecated config options", please follow the error messages to remove these options from your app.ini. Example: ``` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options ``` Many options in `[queue]` are are dropped, including: `WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`, `BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed from app.ini. # The problem The old queue package has some legacy problems: * complexity: I doubt few people could tell how it works. * maintainability: Too many channels and mutex/cond are mixed together, too many different structs/interfaces depends each other. * stability: due to the complexity & maintainability, sometimes there are strange bugs and difficult to debug, and some code doesn't have test (indeed some code is difficult to test because a lot of things are mixed together). * general applicability: although it is called "queue", its behavior is not a well-known queue. * scalability: it doesn't seem easy to make it work with a cluster without breaking its behaviors. It came from some very old code to "avoid breaking", however, its technical debt is too heavy now. It's a good time to introduce a better "queue" package. # The new queue package It keeps using old config and concept as much as possible. * It only contains two major kinds of concepts: * The "base queue": channel, levelqueue, redis * They have the same abstraction, the same interface, and they are tested by the same testing code. * The "WokerPoolQueue", it uses the "base queue" to provide "worker pool" function, calls the "handler" to process the data in the base queue. * The new code doesn't do "PushBack" * Think about a queue with many workers, the "PushBack" can't guarantee the order for re-queued unhandled items, so in new code it just does "normal push" * The new code doesn't do "pause/resume" * The "pause/resume" was designed to handle some handler's failure: eg: document indexer (elasticsearch) is down * If a queue is paused for long time, either the producers blocks or the new items are dropped. * The new code doesn't do such "pause/resume" trick, it's not a common queue's behavior and it doesn't help much. * If there are unhandled items, the "push" function just blocks for a few seconds and then re-queue them and retry. * The new code doesn't do "worker booster" * Gitea's queue's handlers are light functions, the cost is only the go-routine, so it doesn't make sense to "boost" them. * The new code only use "max worker number" to limit the concurrent workers. * The new "Push" never blocks forever * Instead of creating more and more blocking goroutines, return an error is more friendly to the server and to the end user. There are more details in code comments: eg: the "Flush" problem, the strange "code.index" hanging problem, the "immediate" queue problem. Almost ready for review. TODO: * [x] add some necessary comments during review * [x] add some more tests if necessary * [x] update documents and config options * [x] test max worker / active worker * [x] re-run the CI tasks to see whether any test is flaky * [x] improve the `handleOldLengthConfiguration` to provide more friendly messages * [x] fine tune default config values (eg: length?) ## Code coverage: ![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
2023-05-08 13:49:59 +02:00
return
}
assert.NoError(t, redisServer.Start())
if !assert.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server") {
return
}
}
testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false)
testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true)
}