9.4.13.1 توضیحات #
الگوی Subscription (یا Pub-Sub / Observer Pattern) یکی از پرکاربردترین الگوها در معماریهای رویداد-محور و همزمان (event-driven & concurrent) است که امکان ثبتنام (subscribe) یک یا چند مصرفکننده (consumer) را برای دریافت خودکار دادههای جدید از یک منبع یا سرویس فراهم میکند. در این الگو، یک یا چند مصرفکننده به یک “آدرس” یا منبع اشتراک (مثلاً یک topic، کانال یا event source) متصل میشوند و هر زمان که داده یا رویداد جدیدی منتشر شد (publish)، اطلاعات به طور خودکار و بینیاز از polling مکرر به همهی مصرفکنندههای عضو ارسال میشود.
در زبان Go، پیادهسازی Subscription اغلب با استفاده از channelها و goroutineها انجام میشود: یک goroutine به عنوان publisher وظیفه تولید و ارسال دادهها را دارد، و هر consumer میتواند با subscribe کردن (ثبت نام) در یک channel مشترک، دادههای جدید را دریافت کند. این مدل به شما اجازه میدهد تا به سادگی چندین consumer را همزمان به یک منبع داده وصل کنید و مدیریت رویدادهای همزمان، صفهای پیام (message queue)، بروزرسانیهای لحظهای، یا حتی سیستمهای نوتیفیکیشن را به صورت concurrent و بدون بلاک شدن یا پیچیدگی زیاد پیادهسازی کنید.
کاربردهای Subscription در Go بسیار گسترده است: از مدیریت پیامهای real-time (مثل ارسال اعلان در اپلیکیشنها)، اتصال میکروسرویسها، پیادهسازی سیستمهای event sourcing و message broker گرفته تا جمعآوری لاگهای زنده یا حتی مانیتورینگ سرویسهای حیاتی. مزیت اصلی این الگو جداسازی کامل بین تولیدکننده و مصرفکننده (decoupling)، مقیاسپذیری، و سادگی توسعه و تست در معماریهای concurrent و reactive است.
9.4.13.2 دیاگرام #
9.4.13.3 نمونه کد #
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io/ioutil"
8 "log"
9 "net/http"
10 "os"
11 "time"
12)
13
14const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"
15
16type Card struct {
17 Id uint `json:"id"`
18 Uid string `json:"uid"`
19 ValidCard string `json:"valid_card"`
20 Token string `json:"token"`
21 InvalidCard string `json:"invalid_card"`
22 Month string `json:"month"`
23 Year string `json:"year"`
24 CCV string `json:"ccv"`
25 CCVAmex string `json:"ccv_amex"`
26}
27
28type Subscription interface {
29 Updates() <-chan Card
30}
31
32type Fetcher interface {
33 Fetch() (Card, error)
34}
35
36type sub struct {
37 fetcher Fetcher
38 updates chan Card
39}
40
41type fetcher struct {
42 url string
43}
44
45type fetchResult struct {
46 fetchedCard Card
47 err error
48}
49
50// NewSubscription create subscription for fetch data per freq time in second
51func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
52 s := &sub{
53 fetcher: fetcher,
54 updates: make(chan Card),
55 }
56 go s.serve(ctx, freq)
57 return s
58}
59
60func NewFetcher(url string) Fetcher {
61 return &fetcher{
62 url: url,
63 }
64}
65
66func (f *fetcher) Fetch() (Card, error) {
67 return requestAPI(f.url)
68}
69
70func (s *sub) serve(ctx context.Context, freq uint) {
71 ticker := time.NewTicker(time.Duration(freq) * time.Second)
72 done := make(chan fetchResult, 1)
73
74 var (
75 fetchedCard Card
76 fetchResponseStream chan Card
77 pending bool
78 )
79
80 for {
81
82 if pending {
83 fetchResponseStream = s.updates
84 } else {
85 fetchResponseStream = nil
86 }
87
88 select {
89 case <-ticker.C:
90 if pending {
91 break
92 }
93 go func() {
94 fetched, err := s.fetcher.Fetch()
95 done <- fetchResult{fetched, err}
96 }()
97 case result := <-done:
98 fetchedCard = result.fetchedCard
99 if result.err != nil {
100 log.Printf("fetch got error %v", result.err)
101 break
102 }
103 pending = true
104 case fetchResponseStream <- fetchedCard:
105 pending = false
106 case <-ctx.Done():
107 return
108 }
109 }
110}
111
112func (s *sub) Updates() <-chan Card {
113 return s.updates
114}
115
116func requestAPI(url string) (Card, error) {
117 card := Card{}
118 req, err := http.NewRequest(http.MethodGet, url, nil)
119 if err != nil {
120 return Card{}, err
121 }
122 res, err := http.DefaultClient.Do(req)
123 if err != nil {
124 return Card{}, err
125 }
126 body, err := ioutil.ReadAll(res.Body)
127 if err != nil {
128 return Card{}, err
129 }
130 if err := json.Unmarshal(body, &card); err != nil {
131 return Card{}, err
132 }
133 return card, nil
134}
135
136func main() {
137 ctx, cancel := context.WithCancel(context.Background())
138 sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
139
140 time.AfterFunc(1*time.Minute, func() {
141 cancel()
142 log.Println("canceled subscription task")
143 os.Exit(0)
144 })
145
146 for card := range sub.Updates() {
147 fmt.Println(card)
148 }
149}
1$ go run main.go
2{4643 add2475a-ed64-4039-831d-0e95469752d9 371449635398431 tok_mastercard_debit 4000000000000101 01 2024 920 7875}
3{6992 a89e3d71-785a-4d37-9639-be3ce7534257 2223003122003222 tok_discover 4000000000000069 11 2024 660 5241}
4{9287 f665526e-1b34-46f5-9d9d-50362631ed0f 5200828282828210 tok_mastercard_debit 4000000000000036 05 2026 993 6272}
5{4956 e8ae8e75-0ff2-42e8-921c-5cc438d64fac 3566002020360505 tok_amex 4000000000000044 10 2024 371 9989}
6{1193 954d1b36-829b-4726-bbb7-0f5f01b3dd40 6011000990139424 tok_mastercard_debit 4000000000000341 12 2026 331 5119}
در این مثال، ابتدا یک اینترفیس به نام Subscription تعریف شده است که متدی به نام Updates
دارد و کانالی از نوع Card
را برمیگرداند. این کانال نقش مسیر ارتباطی را بین تولیدکننده و مصرفکننده ایفا میکند، به طوری که مصرفکننده میتواند به طور همزمان و غیرمسدود دادههای جدید را دریافت کند. همچنین اینترفیس Fetcher طراحی شده که وظیفهی فراخوانی API و دریافت دادهها را بر عهده دارد و متد Fetch
را ارائه میدهد. این تفکیک وظایف باعث میشود کد قابلیت توسعه و تست بیشتری داشته باشد.
تابع NewSubscription
به عنوان سازنده Subscription عمل میکند؛ این تابع یک struct از نوع sub
ایجاد میکند که حاوی fetcher و یک کانال updates
است. سپس متد serve
به صورت یک goroutine اجرا میشود تا عملیات fetch را در فواصل زمانی مشخص (که با پارامتر freq تعیین میشود) تکرار کند. درون این متد از time.Ticker
برای زمانبندی دقیق استفاده شده است تا به صورت منظم و بدون ایجاد سربار اضافی، دادهها را از API فراخوانی کند و در صورت دریافت موفقیتآمیز، آنها را در کانال منتشر نماید. همچنین با کمک متغیر pending
اطمینان حاصل میشود که یک fetch جدید تا قبل از اتمام fetch قبلی آغاز نشود، بنابراین از فشار بیش از حد به سرویس جلوگیری میشود.
مصرفکنندگان دادهها از طریق متد Updates
به کانال updates
دسترسی دارند و به محض دریافت دادههای جدید، میتوانند پردازش خود را آغاز کنند. استفاده از context.Context
در این ساختار اجازه میدهد که در هر زمان عملیات fetch به صورت ایمن لغو شود و goroutine مربوطه به سرعت و بدون باقی ماندن در حالت بلاکشده خاتمه یابد. این طراحی باعث میشود که برنامه همزمانی بهینهای داشته باشد، منابع به خوبی مدیریت شود و کد قابلیت خوانایی، توسعه و تست آسان را حفظ کند.
در کل، این الگو ترکیبی از بهترین شیوههای Go در مدیریت جریان دادههای ناهمزمان، کنترل concurrency و ارتباط بین goroutineها است که برای دریافت دادههای زنده از API یا منابع خارجی بسیار مناسب است. با چنین معماری میتوان سیستمهایی تولید کرد که علاوه بر مقیاسپذیری بالا، مقاوم در برابر خطا و قابل کنترل نیز باشند.
9.4.13.4 کاربردها #
- دریافت اطلاعات از تولیدکنندهها (Publisher) یا سیستمهای Pub/Sub:
الگوی Subscription به شما این امکان را میدهد که به سادگی به یک یا چند منبع داده (مانند سرویسهای پیامرسان، سیستمهای صف پیام، یا هر منبعی که به صورت publish/subscribe کار میکند) متصل شوید و دادههای جدید را به صورت همزمان و غیرمسدود دریافت کنید. این کار باعث میشود مصرفکنندهها به صورت real-time یا نزدیک به real-time اطلاعات را دریافت و پردازش نمایند و از پیچیدگیهای مدیریت اتصال یا polling مکرر بینیاز شوند. - همگامسازی دادهها از APIهای خارجی:
در بسیاری از برنامهها نیاز است دادهها یا وضعیت از سرویسهای خارجی (مانند RESTful APIها، سرویسهای ابری یا سیستمهای تحلیلی) به صورت دورهای یا بر اساس رویداد بهروزرسانی شوند. الگوی Subscription این امکان را فراهم میکند که بتوانید با تعریف یک سازوکار هوشمند برای دریافت بهروزرسانیها، مصرف دادهها را ساده، پایدار و بهینه کنید. این الگو به خصوص در سناریوهای real-time dashboards، اطلاعرسانی لحظهای و هماهنگسازی دادههای توزیعشده کاربرد فراوان دارد.