Изучаем Storm Framework. Часть II tutorial

В первой части рассматривались базовые понятия Storm.

Разные классы задач предъявляют различные требования к надежности. Одно дело пропустить пару записей при подсчете статистики посещений, где счет идет на сотни тысяч и особая точность не нужна. И совсем другое — потерять, например, информацию о платеже клиента.

Далее рассмотрим о механизмы защиты от потери данных, которые реализованы в Storm.

Базовый пример

Spout

Если нам не важно были ли ошибки при обработке Tuple, то Spout отправляет Tuple в SpoutOutputCollector посредством вызова метода emit(new Values(...)).

Eсли мы хотим узнать успешно ли обработался Tuple, то вызов будет выглядеть как emit(new Values(...), msgId), где msgId это объект произвольного класса. В этом случае интерфейс ISpout предоставляет методы:

  • ack(Object msgId) — будет вызван если Tuple обработан
  • fail(Object msgId) — будет вызван если Tuple не обработан

где msgId — это msgId с которым был вызван SpoutOutputCollector.emit.
Пример FailAwareSpout:

public class FailAwareSpout extends BaseRichSpout {
private Message[] messages;
// Skipped ...
    private static class Message implements Serializable {
        private String message;
        private int failCount;

        private Message(String message) {
            this.message = message;
        }
    }
// Skipped ...
    @Override
    public void nextTuple() {
// Skipped ...
// Отправляем Tuple c msgId
        outputCollector.emit(new Values(messages[messageId].message), messageId);
    }

// Tuple обработан нормально
    @Override
    public void ack(Object msgId) {
        Message m = messages[(Integer) msgId];

        System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
                m.message + " processed successfully");
    }

// Tuple не обработан
    @Override
    public void fail(Object msgId) {
        Message m = messages[(Integer) msgId];
        if(++m.failCount > MAX_RETRY_COUNT) {
            throw new IllegalStateException("Too many message processing errors");
        }
        System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
                m.message + " processing failed " + "[" + m.failCount + "]");
// Вставляем в очередь на повторную обработку
        sendQueue.addLast((Integer) msgId);
    }
}


Методы nextTuple, ack и fail, вызываются в одном потоке и не требуют дополнительной синхронизации при обращении к полям Spout.

Bolt

Для того что бы Bolt мог информировать Storm о результатах обработки, он должен реализовывать интерфейс IRichBolt. Проще всего это сделать унаследовав класс BaseRichBolt.
Bolt информирует Storm o результатах своей работы посредством вызова следующих методов класса OutputCollector в методе execute(Tuple):

  • ack(Tuple) — обработка прошла успешно
  • fail(Tuple) — обработка завершилась с ошибкой

Пример FailingBolt:

public class FailingBolt extends BaseRichBolt {
    OutputCollector outputCollector;
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Skipped ...
            outputCollector.ack(tuple); // Данные успешно обработаны
        }
        else {
// Skipped ...
            outputCollector.fail(tuple); // Обработка завершилась с ошибкой
        }
    }
// Skipped ...
}


Пример использования: BasicFailApp, Spout FailAwareSpout и Bolt FailingBolt случайным образом генерирующий ошибки обработки.

В Bolt'ах унаследованных от класса BaseBasicBolt, ack(Tuple) вызывается после выхода из метода execute автоматически.

Anchoring


При обработке входного Tuple, Bolt может генерировать более одного выходного Tuple. Если Bolt вызвал emit(sourceTuple, resultTuple) то образуется DAG с вершиной в виде исходного Tuple и потомками в виде порожденных Tuple. Storm отслеживает ошибки процессинга всех узлов графа. В случае возникновения ошибки на любом уровне иерархии, Spout, породивший исходный Tuple, будет уведомлен вызовом fail. Пример MultiplierBolt:

public class MultiplierBolt extends BaseRichBolt {
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Генерируем несколько  исходящих Tuple из одного входящего
        for(int i = 0; i < MULTI_COUNT; ++i) {
// Anchoring, привязываем исходящие Tuple к входящему 
            outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i));
        }
        outputCollector.ack(tuple);
    }
// Skipped ...
}


Пример использования Anchoring: TreeFailApp

В Bolt'ах унаследованных от класса BaseBasicBolt метод execute(Tuple, BasicOutputCollector) вызывается с коллектором BasicOutputCollector. Особенность BasicOutputCollector в том, что он автоматически делает Anchor на входной Tuple при emit.

Поскольку Storm является распределенной системой, Tuple могут передаваться с одного узла кластера на другой. В связи с этим Storm обеспечивает отслеживание таймаутов обработки. По умолчанию, весь граф должен быть обработан за 30 секунд, или Storm вызовет метод fail у породившего граф Spout'а. Таймаут можно изменить.

Код доступен на GitHub.

Следующая часть будет посвящена Transactional Topologies, использующихся в связке с транзакционными источниками данных.

Вверх