9.4.13 الگو Subscription

9.4.13 الگو Subscription

9.4.13.1 توضیحات #

الگوی Subscription (یا Pub-Sub / Observer Pattern) یکی از پرکاربردترین الگوها در معماری‌های رویداد-محور و همزمان (event-driven & concurrent) است که امکان ثبت‌نام (subscribe) یک یا چند مصرف‌کننده (consumer) را برای دریافت خودکار داده‌های جدید از یک منبع یا سرویس فراهم می‌کند. در این الگو، یک یا چند مصرف‌کننده به یک “آدرس” یا منبع اشتراک (مثلاً یک topic، کانال یا event source) متصل می‌شوند و هر زمان که داده یا رویداد جدیدی منتشر شد (publish)، اطلاعات به طور خودکار و بی‌نیاز از polling مکرر به همه‌ی مصرف‌کننده‌های عضو ارسال می‌شود.

در زبان Go، پیاده‌سازی Subscription اغلب با استفاده از channelها و goroutineها انجام می‌شود: یک goroutine به عنوان publisher وظیفه تولید و ارسال داده‌ها را دارد، و هر consumer می‌تواند با subscribe کردن (ثبت نام) در یک channel مشترک، داده‌های جدید را دریافت کند. این مدل به شما اجازه می‌دهد تا به سادگی چندین consumer را همزمان به یک منبع داده وصل کنید و مدیریت رویدادهای همزمان، صف‌های پیام (message queue)، بروزرسانی‌های لحظه‌ای، یا حتی سیستم‌های نوتیفیکیشن را به صورت concurrent و بدون بلاک شدن یا پیچیدگی زیاد پیاده‌سازی کنید.

کاربردهای Subscription در Go بسیار گسترده است: از مدیریت پیام‌های real-time (مثل ارسال اعلان در اپلیکیشن‌ها)، اتصال میکروسرویس‌ها، پیاده‌سازی سیستم‌های event sourcing و message broker گرفته تا جمع‌آوری لاگ‌های زنده یا حتی مانیتورینگ سرویس‌های حیاتی. مزیت اصلی این الگو جداسازی کامل بین تولیدکننده و مصرف‌کننده (decoupling)، مقیاس‌پذیری، و سادگی توسعه و تست در معماری‌های concurrent و reactive است.

9.4.13.2 دیاگرام #

flowchart TD Publisher[Publisher / Source] Sub1[Subscriber 1] Sub2[Subscriber 2] SubN[Subscriber N] Topic[Channel / Topic] Publisher -- "Publish Data" --> Topic Topic -- "Push Update" --> Sub1 Topic -- "Push Update" --> Sub2 Topic -- "Push Update" --> SubN Sub1 -- "Subscribe" --> Topic Sub2 -- "Subscribe" --> Topic SubN -- "Subscribe" --> Topic style Topic fill:#e2f0fc,stroke:#377dbf,stroke-width:2px style Publisher fill:#f5e8ff,stroke:#b486e5,stroke-width:2px style Sub1,Sub2,SubN fill:#e9fbe7,stroke:#6dc165,stroke-width:2px

9.4.13.3 نمونه کد #

  1package main
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"io/ioutil"
  8	"log"
  9	"net/http"
 10	"os"
 11	"time"
 12)
 13
 14const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"
 15
 16type Card struct {
 17	Id          uint   `json:"id"`
 18	Uid         string `json:"uid"`
 19	ValidCard   string `json:"valid_card"`
 20	Token       string `json:"token"`
 21	InvalidCard string `json:"invalid_card"`
 22	Month       string `json:"month"`
 23	Year        string `json:"year"`
 24	CCV         string `json:"ccv"`
 25	CCVAmex     string `json:"ccv_amex"`
 26}
 27
 28type Subscription interface {
 29	Updates() <-chan Card
 30}
 31
 32type Fetcher interface {
 33	Fetch() (Card, error)
 34}
 35
 36type sub struct {
 37	fetcher Fetcher
 38	updates chan Card
 39}
 40
 41type fetcher struct {
 42	url string
 43}
 44
 45type fetchResult struct {
 46	fetchedCard Card
 47	err         error
 48}
 49
 50// NewSubscription create subscription for fetch data per freq time in second
 51func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
 52	s := &sub{
 53		fetcher: fetcher,
 54		updates: make(chan Card),
 55	}
 56	go s.serve(ctx, freq)
 57	return s
 58}
 59
 60func NewFetcher(url string) Fetcher {
 61	return &fetcher{
 62		url: url,
 63	}
 64}
 65
 66func (f *fetcher) Fetch() (Card, error) {
 67	return requestAPI(f.url)
 68}
 69
 70func (s *sub) serve(ctx context.Context, freq uint) {
 71	ticker := time.NewTicker(time.Duration(freq) * time.Second)
 72	done := make(chan fetchResult, 1)
 73
 74	var (
 75		fetchedCard         Card
 76		fetchResponseStream chan Card
 77		pending             bool
 78	)
 79
 80	for {
 81
 82		if pending {
 83			fetchResponseStream = s.updates
 84		} else {
 85			fetchResponseStream = nil
 86		}
 87
 88		select {
 89		case <-ticker.C:
 90			if pending {
 91				break
 92			}
 93			go func() {
 94				fetched, err := s.fetcher.Fetch()
 95				done <- fetchResult{fetched, err}
 96			}()
 97		case result := <-done:
 98			fetchedCard = result.fetchedCard
 99			if result.err != nil {
100				log.Printf("fetch got error %v", result.err)
101				break
102			}
103			pending = true
104		case fetchResponseStream <- fetchedCard:
105			pending = false
106		case <-ctx.Done():
107			return
108		}
109	}
110}
111
112func (s *sub) Updates() <-chan Card {
113	return s.updates
114}
115
116func requestAPI(url string) (Card, error) {
117	card := Card{}
118	req, err := http.NewRequest(http.MethodGet, url, nil)
119	if err != nil {
120		return Card{}, err
121	}
122	res, err := http.DefaultClient.Do(req)
123	if err != nil {
124		return Card{}, err
125	}
126	body, err := ioutil.ReadAll(res.Body)
127	if err != nil {
128		return Card{}, err
129	}
130	if err := json.Unmarshal(body, &card); err != nil {
131		return Card{}, err
132	}
133	return card, nil
134}
135
136func main() {
137	ctx, cancel := context.WithCancel(context.Background())
138	sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
139
140	time.AfterFunc(1*time.Minute, func() {
141		cancel()
142		log.Println("canceled subscription task")
143		os.Exit(0)
144	})
145
146	for card := range sub.Updates() {
147		fmt.Println(card)
148	}
149}
1$ go run main.go
2{4643 add2475a-ed64-4039-831d-0e95469752d9 371449635398431 tok_mastercard_debit 4000000000000101 01 2024 920 7875}
3{6992 a89e3d71-785a-4d37-9639-be3ce7534257 2223003122003222 tok_discover 4000000000000069 11 2024 660 5241}
4{9287 f665526e-1b34-46f5-9d9d-50362631ed0f 5200828282828210 tok_mastercard_debit 4000000000000036 05 2026 993 6272}
5{4956 e8ae8e75-0ff2-42e8-921c-5cc438d64fac 3566002020360505 tok_amex 4000000000000044 10 2024 371 9989}
6{1193 954d1b36-829b-4726-bbb7-0f5f01b3dd40 6011000990139424 tok_mastercard_debit 4000000000000341 12 2026 331 5119}

در این مثال، ابتدا یک اینترفیس به نام Subscription تعریف شده است که متدی به نام Updates دارد و کانالی از نوع Card را برمی‌گرداند. این کانال نقش مسیر ارتباطی را بین تولیدکننده و مصرف‌کننده ایفا می‌کند، به طوری که مصرف‌کننده می‌تواند به طور همزمان و غیرمسدود داده‌های جدید را دریافت کند. همچنین اینترفیس Fetcher طراحی شده که وظیفه‌ی فراخوانی API و دریافت داده‌ها را بر عهده دارد و متد Fetch را ارائه می‌دهد. این تفکیک وظایف باعث می‌شود کد قابلیت توسعه و تست بیشتری داشته باشد.

تابع NewSubscription به عنوان سازنده Subscription عمل می‌کند؛ این تابع یک struct از نوع sub ایجاد می‌کند که حاوی fetcher و یک کانال updates است. سپس متد serve به صورت یک goroutine اجرا می‌شود تا عملیات fetch را در فواصل زمانی مشخص (که با پارامتر freq تعیین می‌شود) تکرار کند. درون این متد از time.Ticker برای زمان‌بندی دقیق استفاده شده است تا به صورت منظم و بدون ایجاد سربار اضافی، داده‌ها را از API فراخوانی کند و در صورت دریافت موفقیت‌آمیز، آن‌ها را در کانال منتشر نماید. همچنین با کمک متغیر pending اطمینان حاصل می‌شود که یک fetch جدید تا قبل از اتمام fetch قبلی آغاز نشود، بنابراین از فشار بیش از حد به سرویس جلوگیری می‌شود.

مصرف‌کنندگان داده‌ها از طریق متد Updates به کانال updates دسترسی دارند و به محض دریافت داده‌های جدید، می‌توانند پردازش خود را آغاز کنند. استفاده از context.Context در این ساختار اجازه می‌دهد که در هر زمان عملیات fetch به صورت ایمن لغو شود و goroutine مربوطه به سرعت و بدون باقی ماندن در حالت بلاک‌شده خاتمه یابد. این طراحی باعث می‌شود که برنامه همزمانی بهینه‌ای داشته باشد، منابع به خوبی مدیریت شود و کد قابلیت خوانایی، توسعه و تست آسان را حفظ کند.

در کل، این الگو ترکیبی از بهترین شیوه‌های Go در مدیریت جریان داده‌های ناهمزمان، کنترل concurrency و ارتباط بین goroutineها است که برای دریافت داده‌های زنده از API یا منابع خارجی بسیار مناسب است. با چنین معماری می‌توان سیستم‌هایی تولید کرد که علاوه بر مقیاس‌پذیری بالا، مقاوم در برابر خطا و قابل کنترل نیز باشند.

9.4.13.4 کاربردها #

  • دریافت اطلاعات از تولیدکننده‌ها (Publisher) یا سیستم‌های Pub/Sub:
    الگوی Subscription به شما این امکان را می‌دهد که به سادگی به یک یا چند منبع داده (مانند سرویس‌های پیام‌رسان، سیستم‌های صف پیام، یا هر منبعی که به صورت publish/subscribe کار می‌کند) متصل شوید و داده‌های جدید را به صورت همزمان و غیرمسدود دریافت کنید. این کار باعث می‌شود مصرف‌کننده‌ها به صورت real-time یا نزدیک به real-time اطلاعات را دریافت و پردازش نمایند و از پیچیدگی‌های مدیریت اتصال یا polling مکرر بی‌نیاز شوند.
  • همگام‌سازی داده‌ها از APIهای خارجی:
    در بسیاری از برنامه‌ها نیاز است داده‌ها یا وضعیت از سرویس‌های خارجی (مانند RESTful APIها، سرویس‌های ابری یا سیستم‌های تحلیلی) به صورت دوره‌ای یا بر اساس رویداد به‌روزرسانی شوند. الگوی Subscription این امکان را فراهم می‌کند که بتوانید با تعریف یک سازوکار هوشمند برای دریافت به‌روزرسانی‌ها، مصرف داده‌ها را ساده، پایدار و بهینه کنید. این الگو به خصوص در سناریوهای real-time dashboards، اطلاع‌رسانی لحظه‌ای و هماهنگ‌سازی داده‌های توزیع‌شده کاربرد فراوان دارد.