9.4.9.1 توضیحات #
الگوی Producer-Consumer یکی از الگوهای بنیادی و بسیار پرکاربرد در برنامهنویسی همزمان (concurrent) با زبان Go است که امکان تولید و مصرف داده به صورت همزمان و ایمن را فراهم میکند. در این الگو، معمولاً یک یا چند goroutine به عنوان تولیدکننده (Producer) وظیفه تولید داده، رویداد یا پیام را بر عهده دارند و دادههای تولیدی را از طریق یک کانال (channel) به goroutineهای دیگر که نقش مصرفکننده (Consumer) را دارند، ارسال میکنند. مصرفکنندهها نیز به صورت موازی دادههای دریافتی را از کانال خوانده و پردازش میکنند. این جداسازی نقش تولید و مصرف، باعث میشود بخشهای مختلف برنامه به صورت مستقل و همزمان عمل کرده و در عین حال از race condition و مشکلات همزمانی جلوگیری شود.
کانالها در Go نقش پل ارتباطی ایمن و همزمان بین goroutineها را بازی میکنند. معمولاً برای این الگو از یک کانال یکطرفه (unidirectional) بافردار یا بدون بافر استفاده میشود تا دادهها به شکل صف (queue) از تولیدکننده به مصرفکننده منتقل شوند. هر چند، در معماریهای پیچیدهتر گاهی ممکن است از دو کانال (یکی برای داده و دیگری برای ارسال acknowledgment یا سیگنال برگشتی) بهره گرفته شود، اما در اکثر سناریوهای استاندارد کانال یکطرفه کفایت میکند. کانالهای Go بهطور خودکار هماهنگی بین goroutineها را برقرار میکنند؛ یعنی اگر کانال پر باشد، تولیدکننده منتظر میماند تا مصرفکننده داده را مصرف کند، و اگر کانال خالی باشد، مصرفکننده منتظر میماند تا داده جدید برسد.
این الگو برای حل بسیاری از مسائل دنیای واقعی ایدهآل است؛ از صف بندی و پردازش موازی jobها گرفته تا انتقال داده بین بخشهای مختلف یک سامانه، جمعآوری لاگ، پردازش همزمان پیامها و حتی مدیریت صف درخواستهای ورودی به سرویسها. با استفاده از الگوی Producer-Consumer، نه تنها بهرهوری و سرعت برنامه افزایش مییابد، بلکه کنترل جریان داده، پایداری و مقیاسپذیری سیستم نیز به طرز چشمگیری بهبود پیدا میکند. پیادهسازی صحیح این الگو در Go باعث میشود برنامهنویس بدون نگرانی از مشکلات همزمانی، بخشهای مختلف سیستم را به صورت مستقل توسعه داده و بهینه کند.
9.4.9.2 دیاگرام #
9.4.9.3 نمونه کد #
1package main
2
3import (
4 "fmt"
5 "math/rand"
6 "sync"
7 "time"
8)
9
10func producer(ch chan<- int, count int, wg *sync.WaitGroup) {
11 defer wg.Done()
12 for i := 0; i < count; i++ {
13 n := rand.Intn(100)
14 ch <- n
15 fmt.Println("Produced:", n)
16 time.Sleep(100 * time.Millisecond)
17 }
18 close(ch) // سیگنال پایان تولید به مصرفکنندهها
19}
20
21func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
22 defer wg.Done()
23 for n := range ch {
24 fmt.Printf("Consumer %d: Consumed %d\n", id, n)
25 time.Sleep(200 * time.Millisecond)
26 }
27 fmt.Printf("Consumer %d: Finished\n", id)
28}
29
30func main() {
31 rand.Seed(time.Now().UnixNano())
32 ch := make(chan int, 10) // کانال بافر دار
33
34 var wg sync.WaitGroup
35 produceCount := 20
36
37 wg.Add(1)
38 go producer(ch, produceCount, &wg)
39
40 consumerCount := 2
41 for i := 1; i <= consumerCount; i++ {
42 wg.Add(1)
43 go consumer(i, ch, &wg)
44 }
45
46 wg.Wait()
47 fmt.Println("All done.")
48}
1$ go run main.go
2Produced: 96
3Consumer 2: Consumed 96
4Produced: 55
5Consumer 1: Consumed 55
6Consumer 2: Consumed 36
7Produced: 36
8Produced: 45
9Consumer 1: Consumed 45
10Produced: 61
11Consumer 2: Consumed 61
12Produced: 6
13Consumer 1: Consumed 6
14Produced: 12
15Consumer 2: Consumed 12
16Produced: 45
17Consumer 1: Consumed 45
18Produced: 84
19Consumer 2: Consumed 84
20Produced: 48
21Consumer 1: Consumed 48
22Produced: 78
23Consumer 2: Consumed 78
24Produced: 44
25Consumer 1: Consumed 44
26Produced: 18
27Consumer 2: Consumed 18
28Produced: 92
29Consumer 1: Consumed 92
30Produced: 9
31Consumer 2: Consumed 9
32Produced: 5
33Consumer 1: Consumed 5
34Produced: 32
35Consumer 2: Consumed 32
36Produced: 0
37Consumer 1: Consumed 0
38Produced: 30
39Consumer 2: Consumed 30
40Produced: 86
41Consumer 1: Consumed 86
42Consumer 2: Finished
43Consumer 1: Finished
44All done.
در نسخه بهبود یافتهی مثال Producer-Consumer، تلاش شده تمام چالشهای همزمانی و ضعفهای مدیریت goroutineها به صورت حرفهای و idiomatic در زبان Go برطرف شود. در این ساختار، یک goroutine به عنوان تولیدکننده (Producer) تعریف شده که به تعداد مشخص (مثلاً ۲۰ عدد) داده تصادفی تولید و وارد یک کانال بافردار (مثلاً با ظرفیت ۱۰) میکند. استفاده از کانال بافر دار باعث میشود تولید و مصرف دادهها تا حدی decoupled باشند، یعنی اگر مصرفکنندهها لحظهای کند شوند، تولیدکننده تا پر شدن بافر بدون توقف میتواند تولید کند.
پس از تولید همه دادهها، producer کانال را میبندد. این کار بسیار مهم است؛ چون با بسته شدن کانال، حلقهی for در goroutineهای مصرفکننده به صورت تمیز و بدون خطا تمام میشود و پیام “Finished” برای هر مصرفکننده چاپ میگردد. مصرفکنندهها به صورت worker pool پیادهسازی شدهاند؛ یعنی هر دادهای که از کانال خارج شود به طور تصادفی توسط یکی از مصرفکنندهها پردازش میشود (division of labor) و هرکدام بعد از اتمام کارشان (یعنی زمانی که کانال بسته و همه دادهها مصرف شده باشد) خارج میشوند.
برای جلوگیری از goroutine leak و تضمین پایان تمیز همه goroutineها، از sync.WaitGroup استفاده شده است. با اضافه کردن یک مقدار به WaitGroup قبل از هر goroutine و کم کردن آن هنگام اتمام goroutine (defer wg.Done()
)، میتوان مطمئن بود که برنامه فقط زمانی پایان مییابد که همه تولید و مصرفها به طور کامل انجام شدهاند.
این معماری، همزمانی ایمن، کنترل شده و مقیاسپذیر بین بخش تولید و مصرف داده را فراهم میکند. در چنین سیستمی، میتوانید تعداد تولیدکننده یا مصرفکننده را به راحتی افزایش دهید یا منطق هر بخش را تغییر دهید (مثلاً پردازشهای سنگینتر، ارسال درخواستهای شبکه و…) بدون اینکه نیاز به تغییر بنیادی در ساختار ارتباط بین آنها باشد. استفاده صحیح از کانالها، WaitGroup و مدیریت پایان graceful باعث میشود این الگو به راحتی در پروژههای تولیدی، سرویسهای بلادرنگ، صفهای پیام و هر نوع معماری concurrent مدرن قابل استفاده باشد.
9.4.9.4 کاربردها #
- خط لوله پردازش داده (Data Processing Pipeline): با استفاده از این الگو میتوانید معماری خط لوله یا pipeline ایجاد کنید که هر مرحله از پردازش بهعنوان یک producer یا consumer عمل میکند. به عنوان مثال، ابتدا داده خام توسط یک goroutine تولید و از طریق کانال به مرحله بعد (مثل پاکسازی، تحلیل، تبدیل یا ذخیرهسازی) ارسال میشود و هر مرحله میتواند نقش producer برای مرحله بعد و consumer برای مرحله قبل را داشته باشد. این مدل توسعه، تست و مقیاسپذیری سیستمهای دادهمحور را بسیار ساده و حرفهای میکند.
- سیستمهای Logging و Monitoring: در معماریهای واقعی، ثبت لاگ یا مانیتورینگ سیستم نیازمند جدا بودن تولید و ذخیرهسازی لاگ است تا بخش اصلی سیستم کند نشود. با این الگو، برنامه اصلی (یا بخشهای مختلف آن) بهعنوان producer لاگها را روی کانال میفرستد و یک یا چند consumer این دادهها را از کانال میخوانند و آنها را در فایل، پایگاه داده یا سرور مانیتورینگ ثبت میکنند. این کار از blocking شدن منطق اصلی و تجمع بیمورد داده جلوگیری میکند و ثبت لاگ را مقیاسپذیر و غیرمسدودکننده میکند.
- استریم داده و Real-time Processing: در سناریوهایی مانند پردازش رویدادهای لحظهای (event streaming)، انتقال دادههای IoT یا سیستمهای تحلیل آنلاین (online analytics)، این الگو به شما امکان میدهد دادههای تولیدشده را در لحظه به مصرفکنندگان منتقل کنید. این مصرفکنندگان میتوانند انواع عملیات مانند فیلتر، آمار، آلارم، ذخیرهسازی لحظهای یا پردازش پیشرفته روی داده انجام دهند. Producer-Consumer ستون فقرات معماری بسیاری از سرویسهای real-time و message queue است.
- واسطهگری بین سیستمها (Integration & Decoupling): این الگو به شما اجازه میدهد بخشهای مختلف یک سیستم یا حتی سرویسهای جدا را از طریق کانالهای ارتباطی به هم متصل کنید بدون آنکه وابستگی مستقیم یا coupling بالا داشته باشند. هر producer میتواند در یک microservice یا process مجزا باشد و داده را به صف ارسال کند و هر consumer در سوی دیگر مسئول مصرف و پردازش باشد.