English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Redis 스트리ーム

Redis 스트림은 Redis 5.0 버전에서 새로 추가된 데이터 구조입니다.

Redis 스트림은 메시지 큐(MQ, Message Queue)에 주로 사용되며, Redis 자체는 Redis 발행-구독(pub/sub)을 통해 메시지 큐 기능을 구현할 수 있지만, 메시지가 영구화되지 않는 단점이 있습니다. 네트워크가 끊어지거나 Redis가 고장 나면 메시지가 손실됩니다.

간단히 말해, 발행-구독(pub/sub) 메시지를 배포할 수 있지만, 역사 메시지를 기록할 수 없습니다.

Redis 스트림은 메시지의 영구화와 메인-스태미프레이션 복제 기능을 제공하여, 어떤 클라이언트도 어떤 순간의 데이터에 접근할 수 있으며, 각 클라이언트의 접근 위치를 기억하고, 메시지가 손실되지 않도록 보장합니다.

Redis 스트림의 구조는 다음과 같습니다. 메시지 목록이 있으며, 모든 추가된 메시지를 연결합니다. 각 메시지는 유일한 ID와 해당 내용을 가집니다:

각 스트림에는 유일한 이름이 있으며, 이것이 Redis의 key입니다. 우리가 처음에 xadd 명령어로 메시지를 추가할 때 자동으로 생성됩니다.

위 그림 분석:

  • Consumer Group :소비자 그룹, XGROUP CREATE 명령어로 생성됩니다. 소비자 그룹에는 여러 개의 소비자(Consumer)가 있습니다.

  • last_delivered_id :커서, 각 소비자 그룹에 커서 last_delivered_id가 있으며, 어떤 소비자가 메시지를 읽도록 되면 커서 last_delivered_id가 앞으로 이동합니다.

  • pending_ids :소비자(Consumer)의 상태 변수는 소비자의 미확인 id를 유지하는 역할을 합니다. pending_ids는 클라이언트가 읽었지만 아직 ack(Acknowledge character: 확인 문자)를 받지 않은 메시지를 기록합니다.

메시지 큐 관련 명령어:

  • XADD - 메시지를 마지막에 추가

  • XTRIM - 대기열을 잘라서 길이를 제한

  • XDEL - 메시지 지우기

  • XLEN - 대기열이 포함하는 요소 수를 얻습니다. 즉, 메시지 길이입니다

  • XRANGE - 메시지 목록을 가져옵니다. 지우인 메시지는 자동으로 필터됩니다

  • XREVRANGE -  역으로 메시지 목록을 가져옵니다. ID가 크게됨

  • XREAD - 봉쇄 또는 비봉쇄 방식으로 메시지 목록을 가져옵니다

소비자 그룹 관련 명령어:

  • XGROUP CREATE - 소비자 그룹 생성

  • XREADGROUP 그룹 - 소비자 그룹에서 메시지 읽기

  • XACK - 메시지를 "처리됨"으로 표시

  • XGROUP SETID - 소비자 그룹에 새로운 마지막 전달 메시지 ID 설정

  • XGROUP DELCONSUMER - 소비자 지우기

  • XGROUP DESTROY - 소비자 그룹 지우기

  • XPENDING - 처리되지 않은 메시지 정보 표시

  • XCLAIM - 메시지 소유권 이전

  • XINFO - 대기열과 소비자 그룹 관련 정보 확인;

  • XINFO GROUPS - 소비자 그룹 정보 출력;

  • XINFO STREAM - 대기열 정보 출력

XADD

XADD를 사용하여 대기열에 메시지를 추가합니다. 지정된 대기열이 없으면 대기열을 생성합니다. XADD 문법 형식:

XADD key ID field value [field value ...]
  • : 큐 이름, 존재하지 않으면 생성

  • ID :메시지 ID를 사용하여 * redis가 생성한 것을 나타냅니다.自定义할 수 있지만, 증가하는 것을 보장해야 합니다。

  • field value :기록。

redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) ""1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) ""1601372323627-1"
   2) 1) "field"1"
      2) "value"1"
      3) "field"2"
      4) "value"2"
      5) "field"3"
      6) "value"3"
redis>

XTRIM

XTRIM을 사용하여 대기열을 잘라서 길이를 제한합니다. 문법 형식:

XTRIM key MAXLEN [~] count
  • :대기열 이름

  • MAXLEN :길이

  • 수량 :数量

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) ""1601372434568-0"
   2) 1) "field"1"
      2) "A"
      3) "field"2"
      4) "B"
      5) "field"3"
      6) "C"
      7) "field"4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

XDEL을 사용하여 메시지를 지웁니다. 문법 형식:

XDEL key ID [ID ...]
  • :대기열 이름

  • ID :消息 ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) ""1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) ""3"

XLEN

XLEN을 사용하여 대기열이 포함하는 요소 수를 얻습니다. 즉, 메시지 길이입니다. 문법 형식:

XLEN key
  • :대기열 이름

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • :队列名

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • 수량 :数量

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) ""1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) ""1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • :队列名

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • 수량 :数量

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) ""1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • 수량 :数量

  • 밀리초 :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • :队列名

  • id :消息 ID

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) ""1532"
            3) "event"-id"
            4) ""5"
            5) "user"-id"
            6) ""7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) ""812"
            3) "event"-id"
            4) ""9"
            5) "user"-id"
            6) ""388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-또는-$] [SETID 키 그룹이름 id]-또는-$] [DESTROY 키 그룹이름] [DELCONSUMER 키 그룹이름 소비자이름]
  • : 큐 이름, 존재하지 않으면 생성

  • 그룹이름 : 그룹 이름。

  • $ : 끝에서부터 소비를 나타내며, 새 메시지만 받아들이고 현재 Stream 메시지는 모두 무시됩니다。

시작에서부터 소비 시작:

XGROUP CREATE mystream consumer-그룹-이름 0-0

끝에서부터 소비 시작:

XGROUP CREATE mystream consumer-그룹-이름 $

XREADGROUP 그룹

XREADGROUP GROUP으로 소비 그룹의 메시지를 읽는 방법, 문법 형식:

XREADGROUP 그룹 group consumer [COUNT 수량] [BLOCK 밀리초] [NOACK] STREAMS 키 [키 [...]] ID [ID [...]]
  • 그룹 : 소비 그룹 이름

  • 소비자 : 소비자 이름。

  • 수량 : 읽기 수량。

  • 밀리초 : 블록 밀리초。

  • : 큐 이름。

  • ID : 메시지 ID。

XREADGROUP 그룹 consumer-그룹-이름 consumer-이름 COUNT 1 STREAMS mystream >