development

좋은 속도 제한 알고리즘은 무엇입니까?

big-blog 2020. 6. 15. 07:52
반응형

좋은 속도 제한 알고리즘은 무엇입니까?


나는 의사 코드 또는 더 나은 파이썬을 사용할 수 있습니다. Python IRC 봇에 대한 속도 제한 대기열을 구현하려고하는데 부분적으로 작동하지만 누군가가 제한보다 적은 메시지를 트리거하는 경우 (예 : 속도 제한은 8 초당 5 메시지, 사람은 4 만 트리거), 다음 트리거가 8 초 (예 : 16 초 후)를 초과하면 봇은 메시지를 전송하지만 8 초 기간이 경과 한 후 필요하지 않더라도 대기열이 가득 차서 8 초 동안 대기합니다.


여기에 간단한 알고리즘은 그들이 너무 빨리 도착하면, 당신이 원하는 경우 단지 메시지를 삭제하려면 (대신 큐가 임의의 큰 얻을 수 있기 때문에 의미가있는, 그들 큐잉) :

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

이 솔루션에는 데이터 구조, 타이머 등이 ​​없으며 제대로 작동합니다. :)이를 확인하기 위해 '허용'은 최대 5/8 단위 / 초, 즉 8 초당 최대 5 단위의 속도로 증가합니다. 전달되는 모든 메시지는 한 단위를 빼므로 8 초마다 5 개 이상의 메시지를 보낼 수 없습니다.

참고 rate비 소수점 아래 부분없이, 즉 정수를해야한다, 또는 알고리즘 (실제 속도는되지 않습니다 제대로 작동하지 않습니다 rate/per). 예를 들어 1.0으로 증가 rate=0.5; per=1.0;하지 않으므로 작동하지 않습니다 allowance. 그러나 rate=1.0; per=2.0;잘 작동합니다.


큐에 넣는 함수 앞에이 데코레이터 @RateLimited (ratepersec)를 사용하십시오.

기본적으로 마지막 시간 이후 1 / rate 초가 지 났는지 확인하고 그렇지 않은 경우 나머지 시간을 기다립니다. 그렇지 않으면 기다리지 않습니다. 이렇게하면 속도 / 초로 효과적으로 제한됩니다. 데코레이터는 속도 제한이 필요한 모든 기능에 적용 할 수 있습니다.

귀하의 경우 8 초마다 최대 5 개의 메시지를 원하면 sendToQueue 함수 전에 @RateLimited (0.625)를 사용하십시오.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

토큰 버킷은 구현이 매우 간단합니다.

5 개의 토큰이있는 버킷으로 시작하십시오.

5/8 초마다 : 버킷에 5 개 미만의 토큰이 있으면 토큰을 추가하십시오.

메시지를 보낼 때마다 : 버킷에 1 개 이상의 토큰이있는 경우 하나의 토큰을 꺼내어 메시지를 보냅니다. 그렇지 않으면, 메시지 / 무엇을 기다리거나 삭제하십시오.

(실제로 실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하며 타임 스탬프를 저장하여 5/8 단계마다 최적화 할 수 있습니다)


질문을 다시 읽고 속도 제한이 8 초마다 완전히 재설정되면 다음과 같이 수정됩니다.

last_send오래 전 (예 : 시대)에 타임 스탬프로 시작하십시오 . 또한 동일한 5 토큰 버킷으로 시작하십시오.

5/8 초마다 규칙을칩니다.

메시지를 보낼 때마다 : 먼저 last_send8 초 이상 인지 확인하십시오 . 그렇다면 버킷을 채우십시오 (5 개의 토큰으로 설정). 둘째, 버킷에 토큰이 있으면 메시지를 보내십시오 (그렇지 않으면 drop / wait / etc 등). 셋째, last_send지금 설정하십시오 .

해당 시나리오에서 작동합니다.


실제로 이와 같은 전략을 사용하여 IRC 봇을 작성했습니다 (첫 번째 방법). 파이썬이 아닌 Perl에 있지만 다음과 같은 코드가 있습니다.

여기서 첫 번째 부분은 버킷에 토큰 추가를 처리합니다. 시간 (두 번째 행부터 마지막 ​​행까지)을 기준으로 토큰을 추가 한 다음 마지막 행이 버킷 내용을 최대 (MESSAGE_BURST)로 고정하는 최적화를 볼 수 있습니다.

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn은 전달되는 데이터 구조입니다. 이것은 일상적으로 실행되는 메소드 내부에 있습니다 (다음에 무언가 할 시간을 계산하고 그 시간이나 네트워크 트래픽을 얻을 때까지 잠자기합니다). 메소드의 다음 부분은 전송을 처리합니다. 메시지와 관련된 우선 순위가 있기 때문에 다소 복잡합니다.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

그것은 첫 번째 대기열이며, 무엇이든간에 실행됩니다. 홍수로 인해 연결이 끊어 지더라도. 서버의 PING에 응답하는 것과 같이 매우 중요한 작업에 사용됩니다. 다음으로 나머지 대기열 :

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it'll have more work)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can't prevent it from doing its important kick/ban duties.


to block processing until the message can be sent, thus queuing up further messages, antti's beautiful solution may also be modified like this:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

it just waits until enough allowance is there to send the message. to not start with two times the rate, allowance may also initialized with 0.


Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.

This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.


If someone still interested, I use this simple callable class in conjunction with a timed LRU key value storage to limit request rate per IP. Uses a deque, but can rewritten to be used with a list instead.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

Just a python implementation of a code from accepted answer.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

How about this:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

I needed a variation in Scala. Here it is:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Here is how it can be used:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

참고URL : https://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm

반응형