semaphore 5.0

semaphore 5.0

Что нового будет в этом релизе

В компании Lazada semaphore используется для ограничения пропускной способности некоторых сервисов. Лимиты коррелируют с результатами их стресс-тестирования, но на практике профиль нагрузки не однороден, поэтому возникла задача сделать semaphore адаптируемым к такой нагрузке.

Суть проблемы

Ограничители инициализируются в момент старта сервиса и используются в middleware при обработке входящего трафика. Сейчас, чтобы изменить лимиты, требуется перезапуск сервиса, что не отвечает требованиям высокой доступности.

Задача

Необходимо доработать semaphore так, чтобы появилась возможность менять его вместимость динамически. Сложность заключается в том, что он сделан на каналах, и это накладывает на разработчика определённые ограничения.

Возможные решения

Установка барьера

В случае возникновения определенных событий можно устанавливать барьер, который просто будет забирать часть свободных мест.

s := semaphore.New(1000)
w := watcher.New()
w.On(event.Overloading, func () { s.SetBarrier(200) }) // only 800 places will be available
go w.Watch()

Можно заранее резервировать необходимое количество мест с некоторым запасом и на базе этого решения продумать возможность балансировки.

sem1 := semaphore.New(1000)
sem1.SetBarrier(500)

sem2 := semaphore.New(1000)
sem2.SetBarrier(500)

func rebalancing(barrier int, from, to semaphore.Semaphore) {
	from.SetBarrier(from.Barrier() + barrier / 2)
	to.SetBarrier(to.Barrier() - barrier / 2)
}

w := watcher.New()
w.On(event.Sem1Overloading, func () { rebalancing(400, sem1, sem2)
	/* sem1.Capacity == 300, sem2.Capacity == 700 */ })
w.On(event.Sem2Overloading, func () { rebalancing(400, sem2, sem1) /* vice versa */ })

Отказ от каналов

Так как нет простого способа изменить вместимость канала, то можно попробовать переписать semaphore на использование пакета sync/atomic, в частности на использование atomic.CompareAndSwap. Например, так написан github.com/marusama/semaphore.

func (s *semaphore) SetLimit(limit int) {
	for {
		state := atomic.LoadUint64(&s.state)
		if atomic.CompareAndSwapUint64(&s.state, state, uint64(limit)<<32+state&0xFFFFFFFF) {
			newBroadcastCh := make(chan struct{})
			s.lock.Lock()
			oldBroadcastCh := s.broadcastCh
			s.broadcastCh = newBroadcastCh
			s.lock.Unlock()

			// send broadcast signal
			close(oldBroadcastCh)
			return
		}
	}
}

Разработка

Планируемая версия будет содержать не только решение озвученной выше задачи, но и ряд дополнительных улучшений. Подробнее:

Теги

go semaphore

Автор

Камиль Самигуллин
Камиль Самигуллин

Разработчик

Недавно передо мной встала задача сделать возможным изменять размер semaphore под нагрузкой. На данный момент я вижу два возможных решения, одно из которых планирую выбрать опытным путём.

Спонсоры

Опубликовано

23.12.2017