9.4.9 الگو Producer-Consumer

9.4.9 الگو Producer-Consumer

9.4.9.1 توضیحات #

الگوی Producer-Consumer یکی از الگوهای بنیادی و بسیار پرکاربرد در برنامه‌نویسی همزمان (concurrent) با زبان Go است که امکان تولید و مصرف داده به صورت همزمان و ایمن را فراهم می‌کند. در این الگو، معمولاً یک یا چند goroutine به عنوان تولیدکننده (Producer) وظیفه تولید داده، رویداد یا پیام را بر عهده دارند و داده‌های تولیدی را از طریق یک کانال (channel) به goroutineهای دیگر که نقش مصرف‌کننده (Consumer) را دارند، ارسال می‌کنند. مصرف‌کننده‌ها نیز به صورت موازی داده‌های دریافتی را از کانال خوانده و پردازش می‌کنند. این جداسازی نقش تولید و مصرف، باعث می‌شود بخش‌های مختلف برنامه به صورت مستقل و همزمان عمل کرده و در عین حال از race condition و مشکلات همزمانی جلوگیری شود.

کانال‌ها در Go نقش پل ارتباطی ایمن و همزمان بین goroutineها را بازی می‌کنند. معمولاً برای این الگو از یک کانال یک‌طرفه (unidirectional) بافر‌دار یا بدون بافر استفاده می‌شود تا داده‌ها به شکل صف (queue) از تولیدکننده به مصرف‌کننده منتقل شوند. هر چند، در معماری‌های پیچیده‌تر گاهی ممکن است از دو کانال (یکی برای داده و دیگری برای ارسال acknowledgment یا سیگنال برگشتی) بهره گرفته شود، اما در اکثر سناریوهای استاندارد کانال یک‌طرفه کفایت می‌کند. کانال‌های Go به‌طور خودکار هماهنگی بین goroutineها را برقرار می‌کنند؛ یعنی اگر کانال پر باشد، تولیدکننده منتظر می‌ماند تا مصرف‌کننده داده را مصرف کند، و اگر کانال خالی باشد، مصرف‌کننده منتظر می‌ماند تا داده جدید برسد.

این الگو برای حل بسیاری از مسائل دنیای واقعی ایده‌آل است؛ از صف بندی و پردازش موازی jobها گرفته تا انتقال داده بین بخش‌های مختلف یک سامانه، جمع‌آوری لاگ، پردازش همزمان پیام‌ها و حتی مدیریت صف درخواست‌های ورودی به سرویس‌ها. با استفاده از الگوی Producer-Consumer، نه تنها بهره‌وری و سرعت برنامه افزایش می‌یابد، بلکه کنترل جریان داده، پایداری و مقیاس‌پذیری سیستم نیز به طرز چشمگیری بهبود پیدا می‌کند. پیاده‌سازی صحیح این الگو در Go باعث می‌شود برنامه‌نویس بدون نگرانی از مشکلات همزمانی، بخش‌های مختلف سیستم را به صورت مستقل توسعه داده و بهینه کند.

9.4.9.2 دیاگرام #

flowchart TD subgraph Producers direction TB P1["Producer 1"] P2["Producer 2"] Pn["..."] end subgraph Channel Chan["Channel (Buffer/Queue)"] end subgraph Consumers direction TB C1["Consumer 1"] C2["Consumer 2"] Cn["..."] end P1 -- "Send Data" --> Chan P2 -- "Send Data" --> Chan Pn -- "Send Data" --> Chan Chan -- "Receive Data" --> C1 Chan -- "Receive Data" --> C2 Chan -- "Receive Data" --> Cn

9.4.9.3 نمونه کد #

 1package main
 2
 3import (
 4	"fmt"
 5	"math/rand"
 6	"sync"
 7	"time"
 8)
 9
10func producer(ch chan<- int, count int, wg *sync.WaitGroup) {
11	defer wg.Done()
12	for i := 0; i < count; i++ {
13		n := rand.Intn(100)
14		ch <- n
15		fmt.Println("Produced:", n)
16		time.Sleep(100 * time.Millisecond)
17	}
18	close(ch) // سیگنال پایان تولید به مصرف‌کننده‌ها
19}
20
21func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
22	defer wg.Done()
23	for n := range ch {
24		fmt.Printf("Consumer %d: Consumed %d\n", id, n)
25		time.Sleep(200 * time.Millisecond)
26	}
27	fmt.Printf("Consumer %d: Finished\n", id)
28}
29
30func main() {
31	rand.Seed(time.Now().UnixNano())
32	ch := make(chan int, 10) // کانال بافر دار
33
34	var wg sync.WaitGroup
35	produceCount := 20
36
37	wg.Add(1)
38	go producer(ch, produceCount, &wg)
39
40	consumerCount := 2
41	for i := 1; i <= consumerCount; i++ {
42		wg.Add(1)
43		go consumer(i, ch, &wg)
44	}
45
46	wg.Wait()
47	fmt.Println("All done.")
48}
 1$ go run main.go
 2Produced: 96
 3Consumer 2: Consumed 96
 4Produced: 55
 5Consumer 1: Consumed 55
 6Consumer 2: Consumed 36
 7Produced: 36
 8Produced: 45
 9Consumer 1: Consumed 45
10Produced: 61
11Consumer 2: Consumed 61
12Produced: 6
13Consumer 1: Consumed 6
14Produced: 12
15Consumer 2: Consumed 12
16Produced: 45
17Consumer 1: Consumed 45
18Produced: 84
19Consumer 2: Consumed 84
20Produced: 48
21Consumer 1: Consumed 48
22Produced: 78
23Consumer 2: Consumed 78
24Produced: 44
25Consumer 1: Consumed 44
26Produced: 18
27Consumer 2: Consumed 18
28Produced: 92
29Consumer 1: Consumed 92
30Produced: 9
31Consumer 2: Consumed 9
32Produced: 5
33Consumer 1: Consumed 5
34Produced: 32
35Consumer 2: Consumed 32
36Produced: 0
37Consumer 1: Consumed 0
38Produced: 30
39Consumer 2: Consumed 30
40Produced: 86
41Consumer 1: Consumed 86
42Consumer 2: Finished
43Consumer 1: Finished
44All done.

در نسخه بهبود یافته‌ی مثال Producer-Consumer، تلاش شده تمام چالش‌های همزمانی و ضعف‌های مدیریت goroutineها به صورت حرفه‌ای و idiomatic در زبان Go برطرف شود. در این ساختار، یک goroutine به عنوان تولیدکننده (Producer) تعریف شده که به تعداد مشخص (مثلاً ۲۰ عدد) داده تصادفی تولید و وارد یک کانال بافر‌دار (مثلاً با ظرفیت ۱۰) می‌کند. استفاده از کانال بافر دار باعث می‌شود تولید و مصرف داده‌ها تا حدی decoupled باشند، یعنی اگر مصرف‌کننده‌ها لحظه‌ای کند شوند، تولیدکننده تا پر شدن بافر بدون توقف می‌تواند تولید کند.

پس از تولید همه داده‌ها، producer کانال را می‌بندد. این کار بسیار مهم است؛ چون با بسته شدن کانال، حلقه‌ی for در goroutineهای مصرف‌کننده به صورت تمیز و بدون خطا تمام می‌شود و پیام “Finished” برای هر مصرف‌کننده چاپ می‌گردد. مصرف‌کننده‌ها به صورت worker pool پیاده‌سازی شده‌اند؛ یعنی هر داده‌ای که از کانال خارج شود به طور تصادفی توسط یکی از مصرف‌کننده‌ها پردازش می‌شود (division of labor) و هرکدام بعد از اتمام کارشان (یعنی زمانی که کانال بسته و همه داده‌ها مصرف شده باشد) خارج می‌شوند.

برای جلوگیری از goroutine leak و تضمین پایان تمیز همه goroutineها، از sync.WaitGroup استفاده شده است. با اضافه کردن یک مقدار به WaitGroup قبل از هر goroutine و کم کردن آن هنگام اتمام goroutine (defer wg.Done())، می‌توان مطمئن بود که برنامه فقط زمانی پایان می‌یابد که همه تولید و مصرف‌ها به طور کامل انجام شده‌اند.

این معماری، همزمانی ایمن، کنترل شده و مقیاس‌پذیر بین بخش تولید و مصرف داده را فراهم می‌کند. در چنین سیستمی، می‌توانید تعداد تولیدکننده یا مصرف‌کننده را به راحتی افزایش دهید یا منطق هر بخش را تغییر دهید (مثلاً پردازش‌های سنگین‌تر، ارسال درخواست‌های شبکه و…) بدون اینکه نیاز به تغییر بنیادی در ساختار ارتباط بین آن‌ها باشد. استفاده صحیح از کانال‌ها، WaitGroup و مدیریت پایان graceful باعث می‌شود این الگو به راحتی در پروژه‌های تولیدی، سرویس‌های بلادرنگ، صف‌های پیام و هر نوع معماری concurrent مدرن قابل استفاده باشد.

9.4.9.4 کاربردها #

  • خط لوله پردازش داده (Data Processing Pipeline): با استفاده از این الگو می‌توانید معماری خط لوله یا pipeline ایجاد کنید که هر مرحله از پردازش به‌عنوان یک producer یا consumer عمل می‌کند. به عنوان مثال، ابتدا داده خام توسط یک goroutine تولید و از طریق کانال به مرحله بعد (مثل پاک‌سازی، تحلیل، تبدیل یا ذخیره‌سازی) ارسال می‌شود و هر مرحله می‌تواند نقش producer برای مرحله بعد و consumer برای مرحله قبل را داشته باشد. این مدل توسعه، تست و مقیاس‌پذیری سیستم‌های داده‌محور را بسیار ساده و حرفه‌ای می‌کند.
  • سیستم‌های Logging و Monitoring: در معماری‌های واقعی، ثبت لاگ یا مانیتورینگ سیستم نیازمند جدا بودن تولید و ذخیره‌سازی لاگ است تا بخش اصلی سیستم کند نشود. با این الگو، برنامه اصلی (یا بخش‌های مختلف آن) به‌عنوان producer لاگ‌ها را روی کانال می‌فرستد و یک یا چند consumer این داده‌ها را از کانال می‌خوانند و آن‌ها را در فایل، پایگاه داده یا سرور مانیتورینگ ثبت می‌کنند. این کار از blocking شدن منطق اصلی و تجمع بی‌مورد داده جلوگیری می‌کند و ثبت لاگ را مقیاس‌پذیر و غیرمسدودکننده می‌کند.
  • استریم داده و Real-time Processing: در سناریوهایی مانند پردازش رویدادهای لحظه‌ای (event streaming)، انتقال داده‌های IoT یا سیستم‌های تحلیل آنلاین (online analytics)، این الگو به شما امکان می‌دهد داده‌های تولیدشده را در لحظه به مصرف‌کنندگان منتقل کنید. این مصرف‌کنندگان می‌توانند انواع عملیات مانند فیلتر، آمار، آلارم، ذخیره‌سازی لحظه‌ای یا پردازش پیشرفته روی داده انجام دهند. Producer-Consumer ستون فقرات معماری بسیاری از سرویس‌های real-time و message queue است.
  • واسطه‌گری بین سیستم‌ها (Integration & Decoupling): این الگو به شما اجازه می‌دهد بخش‌های مختلف یک سیستم یا حتی سرویس‌های جدا را از طریق کانال‌های ارتباطی به هم متصل کنید بدون آنکه وابستگی مستقیم یا coupling بالا داشته باشند. هر producer می‌تواند در یک microservice یا process مجزا باشد و داده را به صف ارسال کند و هر consumer در سوی دیگر مسئول مصرف و پردازش باشد.