9.4.15 الگو Subscription

9.4.15 الگو Subscription

9.4.15.1 توضیحات #

الگو Subscription یکی از الگوهای کاربردی برای پیاده سازی consumer می باشد که به یک آدرسی مشترک شوید و در بازده زمانی مختلف درخواست دهید و یکسری اطلاعات دریافت کنید.

9.4.15.2 دیاگرام #

9.4.15.3 نمونه کد #

  1package main
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"io/ioutil"
  8	"log"
  9	"net/http"
 10	"os"
 11	"time"
 12)
 13
 14const exampleAPIAddress = "https://random-data-api.com/api/stripe/random_stripe"
 15
 16type Card struct {
 17	Id          uint   `json:"id"`
 18	Uid         string `json:"uid"`
 19	ValidCard   string `json:"valid_card"`
 20	Token       string `json:"token"`
 21	InvalidCard string `json:"invalid_card"`
 22	Month       string `json:"month"`
 23	Year        string `json:"year"`
 24	CCV         string `json:"ccv"`
 25	CCVAmex     string `json:"ccv_amex"`
 26}
 27
 28type Subscription interface {
 29	Updates() <-chan Card
 30}
 31
 32type Fetcher interface {
 33	Fetch() (Card, error)
 34}
 35
 36type sub struct {
 37	fetcher Fetcher
 38	updates chan Card
 39}
 40
 41type fetcher struct {
 42	url string
 43}
 44
 45type fetchResult struct {
 46	fetchedCard Card
 47	err         error
 48}
 49
 50// NewSubscription create subscription for fetch data per freq time in second
 51func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
 52	s := &sub{
 53		fetcher: fetcher,
 54		updates: make(chan Card),
 55	}
 56	go s.serve(ctx, freq)
 57	return s
 58}
 59
 60func NewFetcher(url string) Fetcher {
 61	return &fetcher{
 62		url: url,
 63	}
 64}
 65
 66func (f *fetcher) Fetch() (Card, error) {
 67	return requestAPI(f.url)
 68}
 69
 70func (s *sub) serve(ctx context.Context, freq uint) {
 71	ticker := time.NewTicker(time.Duration(freq) * time.Second)
 72	done := make(chan fetchResult, 1)
 73
 74	var (
 75		fetchedCard         Card
 76		fetchResponseStream chan Card
 77		pending             bool
 78	)
 79
 80	for {
 81
 82		if pending {
 83			fetchResponseStream = s.updates
 84		} else {
 85			fetchResponseStream = nil
 86		}
 87
 88		select {
 89		case <-ticker.C:
 90			if pending {
 91				break
 92			}
 93			go func() {
 94				fetched, err := s.fetcher.Fetch()
 95				done <- fetchResult{fetched, err}
 96			}()
 97		case result := <-done:
 98			fetchedCard = result.fetchedCard
 99			if result.err != nil {
100				log.Printf("fetch got error %v", result.err)
101				break
102			}
103			pending = true
104		case fetchResponseStream <- fetchedCard:
105			pending = false
106		case <-ctx.Done():
107			return
108		}
109	}
110}
111
112func (s *sub) Updates() <-chan Card {
113	return s.updates
114}
115
116func requestAPI(url string) (Card, error) {
117	card := Card{}
118	req, err := http.NewRequest(http.MethodGet, url, nil)
119	if err != nil {
120		return Card{}, err
121	}
122	res, err := http.DefaultClient.Do(req)
123	if err != nil {
124		return Card{}, err
125	}
126	body, err := ioutil.ReadAll(res.Body)
127	if err != nil {
128		return Card{}, err
129	}
130	if err := json.Unmarshal(body, &card); err != nil {
131		return Card{}, err
132	}
133	return card, nil
134}
135
136func main() {
137	ctx, cancel := context.WithCancel(context.Background())
138	sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
139
140	time.AfterFunc(1*time.Minute, func() {
141		cancel()
142		log.Println("canceled subscription task")
143		os.Exit(0)
144	})
145
146	for card := range sub.Updates() {
147		fmt.Println(card)
148	}
149}
1$ go run main.go
2{4643 add2475a-ed64-4039-831d-0e95469752d9 371449635398431 tok_mastercard_debit 4000000000000101 01 2024 920 7875}
3{6992 a89e3d71-785a-4d37-9639-be3ce7534257 2223003122003222 tok_discover 4000000000000069 11 2024 660 5241}
4{9287 f665526e-1b34-46f5-9d9d-50362631ed0f 5200828282828210 tok_mastercard_debit 4000000000000036 05 2026 993 6272}
5{4956 e8ae8e75-0ff2-42e8-921c-5cc438d64fac 3566002020360505 tok_amex 4000000000000044 10 2024 371 9989}
6{1193 954d1b36-829b-4726-bbb7-0f5f01b3dd40 6011000990139424 tok_mastercard_debit 4000000000000341 12 2026 331 5119}
  • برای گرفتن دیتا ما یک اینترفیس به نام Subscription با متد Updates تعریف کردیم.
1type Subscription interface {
2	Updates() <-chan Card
3}
  • سپس یک اینترفیس دیگر به نام Fetcher برای گرفتن داده از API با متد Fetch تعریف کردیم.
1type Fetcher interface {
2	Fetch() (Card, error)
3}
  • اکنون یک تابع به نام NewSubscription ایجاد کردیم context, NetFetcher و مدت زمان فاصله بین هر درخواست را پاس دادیم.
1ctx, cancel := context.WithCancel(context.Background())
2sub := NewSubscription(ctx, NewFetcher(exampleAPIAddress), 3)
  • بعد از اینکه ساختار sub را راه اندازی کردیم متد serve را برای اجرا داخل گوروتین قرار دادیم.
1func NewSubscription(ctx context.Context, fetcher Fetcher, freq uint) Subscription {
2	s := &sub{
3		fetcher: fetcher,
4		updates: make(chan Card),
5	}
6	go s.serve(ctx, freq)
7	return s
8}

حال پس از هر time.Ticker به آدرس API مورد نظر بواسطه متد Fetch درخواست ارسال می شود و اطلاعات دریافت می شود به داخل کانال می فرستیم. در نهایت از طریق متد Updates می توانیم اطلاعات را دریافت کنیم.

9.4.15.4 کاربردها #

  • دریافت اطلاعات از یک تولید کننده (Publisher) یا آدرسی (Pub/Sub)
  • همگام سازی اطلاعات از یک API