Lock-free структуры данных. Concurrent maps: skip list. У java есть реализация списка пропуска С какого уровня начинать искать? Определение L(n)

05.01.2024 Безопасность

7 ответов

Поскольку вы упомянули Список, который является как индексируемым (я предполагаю, что вы хотите скорейшего извлечения), так и для того, чтобы разрешать дубликаты, я бы посоветовал вам использовать пользовательский набор с LinkedList или ArrayList.

Вам нужно иметь базовый набор, например, HashSet и продолжать добавлять к нему значения. Если вы сталкиваетесь с дубликатом, значение этого набора должно указывать на список. Итак, у вас будет и быстрый поиск, и, конечно же, вы сохраните свои объекты в виде коллекции psuedo.

Это должно дать вам хорошую эффективность для извлечения. В идеале, если ваши Ключи не дублируются, вы достигнете O (1) в качестве скорости поиска.

Этот ответ задерживается на 3 года, но я надеюсь, что он будет полезен тем, кто хочет получить список пропусков Java с этого момента:)

Это решение позволяет дублировать, как вы просили. Я следую примерно руководству здесь http://igoro.com/archive/skip-lists-are-fascinating , поэтому сложности похожи на то, что удалить стоит O (nlogn) - как я это делал "Не пытайтесь использовать двусвязные узлы, я предполагаю, что это приведет к тому, что удалить до O (logn).

Код содержит: интерфейс, список пропуска, реализующий интерфейс, и класс node. Он также является общим.

Вы можете настроить параметр LEVELS для производительности, но помните компромисс между пространством и временем.

> search(T data); } public class SkipList> implements SkippableList { public static void main(String args) { SkipList sl = new SkipList<>(); int data = {4,2,7,0,9,1,3,7,3,4,5,6,0,2,8}; for (int i: data) { sl.insert(i); } sl.print(); sl.search(4); sl.delete(9); sl.print(); sl.insert(69); sl.print(); sl.search(69); } private final SkipNode head = new SkipNode<> SkipNode = new SkipNode<>(data); for (int i = 0; i < LEVELS; i++) { if (rand.nextInt((int) Math.pow(2, i)) == 0) { //insert with prob = 1/(2^i) insert(SkipNode, i); } } } @Override public boolean delete(T target) { System.out.println("Deleting " + target.toString()); SkipNode victim = search(target, false); if (victim == null) return false; victim.data = null; for (int i = 0; i < LEVELS; i++) { head.refreshAfterDelete(i); } System.out.println(); return true; } @Override public SkipNode search(T data) { return search(data, true); } @Override public void print() { for (int i = 0; i < LEVELS; i++) { head.print(i); } System.out.println(); } private void insert(SkipNode result = null; for (int i = LEVELS-1; i >= 0; i--) { if ((result = head.search(data, i, print)) != null) { if (print) { System.out.println("Found " + data.toString() + " at level " + i + ", so stoppped"); System.out.println(); } break; } } return result; } } class SkipNode> next = (SkipNode current = this.getNext(level); while (current != null && current.getNext(level) != null) { if (current.getNext(level).data == null) { SkipNode result = null; SkipNode < 1) { if (current.data.equals(data)) { result = current; break; } current = current.getNext(level); } return result; } void insert(SkipNode SkipNode, int level) { SkipNode current = this.getNext(level); if (current == null) { this.setNext(SkipNode, level); return; } if (SkipNode.data.compareTo(current.data) < 1) { this.setNext(SkipNode, level); SkipNode.setNext(current, level); return; } while (current.getNext(level) != null && current.data.compareTo(SkipNode.data) < 1 && current.getNext(level).data.compareTo(SkipNode.data) < 1) { current = current.getNext(level); } SkipNode successor = current.getNext(level); current.setNext(SkipNode, level); SkipNode.setNext(successor, level); } void print(int level) { System.out.print("level " + level + ": ["); int length = 0; SkipNode current = this.getNext(level); while (current != null) { length++; System.out.print(current.data.toString() + " "); current = current.getNext(level); } System.out.println("], length: " + length); } }

Вы можете использовать приведенное ниже код, чтобы сделать свой собственный базовый скипист :

1) Сделайте start и конец до represent start and end of skip list .

2) Add the nodes и assign pointers до следующего based on

If(node is even) then ,assign a fast lane pointer with next pointer else assign only pointer to next node

Java-код для базового списка пропуска (вы можете добавить дополнительные функции):

Public class MyClass { public static void main(String args) { Skiplist skiplist=new Skiplist(); Node n1=new Node(); Node n2=new Node(); Node n3=new Node(); Node n4=new Node(); Node n5=new Node(); Node n6=new Node(); n1.setData(1); n2.setData(2); n3.setData(3); n4.setData(4); n5.setData(5); n6.setData(6); skiplist.insert(n1); skiplist.insert(n2); skiplist.insert(n3); skiplist.insert(n4); skiplist.insert(n5); skiplist.insert(n6); /*print all nodes*/ skiplist.display(); System.out.println(); /* print only fast lane node*/ skiplist.displayFast(); } } class Node{ private int data; private Node one_next; //contain pointer to next node private Node two_next; //pointer to node after the very next node public int getData() { return data; } public void setData(int data) { this.data = data; } public Node getOne_next() { return one_next; } public void setOne_next(Node one_next) { this.one_next = one_next; } public Node getTwo_next() { return two_next; } public void setTwo_next(Node two_next) { this.two_next = two_next; } } class Skiplist{ Node start; //start pointer to skip list Node head; Node temp_next; //pointer to store last used fast lane node Node end; //end of skip list int length; public Skiplist(){ start=new Node(); end=new Node(); length=0; temp_next=start; } public void insert(Node node){ /*if skip list is empty */ if(length==0){ start.setOne_next(node); node.setOne_next(end); temp_next.setTwo_next(end); head=start; length++; } else{ length++; Node temp=start.getOne_next(); Node prev=start; while(temp != end){ prev=temp; temp=temp.getOne_next(); } /*add a fast lane pointer for even no of nodes*/ if(length%2==0){ prev.setOne_next(node); node.setOne_next(end); temp_next.setTwo_next(node); temp_next=node; node.setTwo_next(end); } /*odd no of node will not contain fast lane pointer*/ else{ prev.setOne_next(node); node.setOne_next(end); } } } public void display(){ System.out.println("--Simple Traversal--"); Node temp=start.getOne_next(); while(temp != end){ System.out.print(temp.getData()+"=>"); temp=temp.getOne_next(); } } public void displayFast(){ System.out.println("--Fast Lane Traversal--"); Node temp=start.getTwo_next(); while(temp !=end){ System.out.print(temp.getData()+"==>"); temp=temp.getTwo_next(); } } }

Вывод:

Простой обход -

1 = > 2 = > 3 = > 4 = > 5 = > 6 = >

Быстрая перемотка дорожки -

2 == > 4 == > 6 == >

Когда вы создаете ConcurrentSkipListSet , вы передаете компаратор в конструктор.

New ConcurrentSkipListSet<>(new ExampleComparator()); public class ExampleComparator implements Comparator {//your impl }

Вы можете создать компаратор, который сделает ваш SkipListSet вести себя как обычный список.

Я не утверждаю, что это моя собственная реализация. Я просто не могу вспомнить, где я его нашел. Если вы знаете, дайте мне знать, и я обновлю. Это работает для меня хорошо:

Public class SkipList> implements Iterable { Node _head = new Node<>(null, 33); private final Random rand = new Random(); private int _levels = 1; private AtomicInteger size = new AtomicInteger(0); ///

/// Inserts a value into the skip list. /// public void insert(T value) { // Determine the level of the new node. Generate a random number R. The // number of // 1-bits before we encounter the first 0-bit is the level of the node. // Since R is // 32-bit, the level can be at most 32. int level = 0; size.incrementAndGet(); for (int R = rand.nextInt(); (R & 1) == 1; R >>= 1) { level++; if (level == _levels) { _levels++; break; } } // Insert this node into the skip list Node newNode = new Node<>(value, level + 1); Node cur = _head; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) > 0) break; } if (i <= level) { newNode.next[i] = cur.next[i]; cur.next[i] = newNode; } } } /// /// Returns whether a particular value already exists in the skip list /// public boolean contains(T value) { Node cur = _head; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) > 0) break; if (cur.next[i].getValue().compareTo(value) == 0) return true; } } return false; } /// /// Attempts to remove one occurence of a particular value from the skip /// list. Returns /// whether the value was found in the skip list. /// public boolean remove(T value) { Node cur = _head; boolean found = false; for (int i = _levels - 1; i >= 0; i--) { for (; cur.next[i] != null; cur = cur.next[i]) { if (cur.next[i].getValue().compareTo(value) == 0) { found = true; cur.next[i] = cur.next[i].next[i]; break; } if (cur.next[i].getValue().compareTo(value) > 0) break; } } if (found) size.decrementAndGet(); return found; } @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public Iterator iterator() { return new SkipListIterator(this, 0); } public int size() { return size.get(); } public Double toArray() { Double a = new Double; int i = 0; for (T t: this) { a[i] = (Double) t; i++; } return a; } } class Node> { public Node next; public N value; @SuppressWarnings("unchecked") public Node(N value, int level) { this.value = value; next = new Node; } public N getValue() { return value; } public Node getNext() { return next; } public Node getNext(int level) { return next; } public void setNext(Node next) { this.next = next; } } class SkipListIterator> implements Iterator { SkipList list; Node current; int level; public SkipListIterator(SkipList list, int level) { this.list = list; this.current = list._head; this.level = level; } public boolean hasNext() { return current.getNext(level) != null; } public E next() { current = current.getNext(level); return current.getValue(); } public void remove() throws UnsupportedOperationException { throw new UnsupportedOperationException(); } }

Исправлена ​​ошибка в реализации, предоставляемой @PoweredByRice. Он выбросил NPE для случаев, когда удаленный node был первым node. Другие обновления включают переименованные имена переменных и обратную печать порядка списка пропусков.

Import java.util.Random; interface SkippableList> { int LEVELS = 5; boolean delete(T target); void print(); void insert(T data); SkipNode search(T data); } class SkipNode> { N data; @SuppressWarnings("unchecked") SkipNode next = (SkipNode) new SkipNode; SkipNode(N data) { this.data = data; } void refreshAfterDelete(int level) { SkipNode current = this; while (current != null && current.getNext(level) != null) { if (current.getNext(level).data == null) { SkipNode successor = current.getNext(level).getNext(level); current.setNext(successor, level); return; } current = current.getNext(level); } } void setNext(SkipNode next, int level) { this.next = next; } SkipNode getNext(int level) { return this.next; } SkipNode search(N data, int level, boolean print) { if (print) { System.out.print("Searching for: " + data + " at "); print(level); } SkipNode result = null; SkipNode current = this.getNext(level); while (current != null && current.data.compareTo(data) < 1) { if (current.data.equals(data)) { result = current; break; } current = current.getNext(level); } return result; } void insert(SkipNode skipNode, int level) { SkipNode current = this.getNext(level); if (current == null) { this.setNext(skipNode, level); return; } if (skipNode.data.compareTo(current.data) < 1) { this.setNext(skipNode, level); skipNode.setNext(current, level); return; } while (current.getNext(level) != null && current.data.compareTo(skipNode.data) < 1 && current.getNext(level).data.compareTo(skipNode.data) < 1) { current = current.getNext(level); } SkipNode successor = current.getNext(level); current.setNext(skipNode, level); skipNode.setNext(successor, level); } void print(int level) { System.out.print("level " + level + ": [ "); int length = 0; SkipNode current = this.getNext(level); while (current != null) { length++; System.out.print(current.data + " "); current = current.getNext(level); } System.out.println("], length: " + length); } } public class SkipList> implements SkippableList { private final SkipNode head = new SkipNode<>(null); private final Random rand = new Random(); @Override public void insert(T data) { SkipNode skipNode = new SkipNode<>(data); for (int i = 0; i < LEVELS; i++) { if (rand.nextInt((int) Math.pow(2, i)) == 0) { //insert with prob = 1/(2^i) insert(skipNode, i); } } } @Override public boolean delete(T target) { System.out.println("Deleting " + target); SkipNode victim = search(target, true); if (victim == null) return false; victim.data = null; for (int i = 0; i < LEVELS; i++) { head.refreshAfterDelete(i); } System.out.println("deleted..."); return true; } @Override public SkipNode search(T data) { return search(data, true); } @Override public void print() { for (int i = LEVELS-1; i >= 0 ; i--) { head.print(i); } System.out.println(); } private void insert(SkipNode SkipNode, int level) { head.insert(SkipNode, level); } private SkipNode search(T data, boolean print) { SkipNode result = null; for (int i = LEVELS-1; i >= 0; i--) { if ((result = head.search(data, i, print)) != null) { if (print) { System.out.println("Found " + data.toString() + " at level " + i + ", so stopped"); System.out.println(); } break; } } return result; } public static void main(String args) { SkipList sl = new SkipList<>(); int data = {4,2,7,0,9,1,3,7,3,4,5,6,0,2,8}; for (int i: data) { sl.insert(i); } sl.print(); sl.search(4); sl.delete(4); System.out.println("Inserting 10"); sl.insert(10); sl.print(); sl.search(10); } }

В предыдущих статьях (раз , два) мы рассматривали классический hash map с хеш-таблицей и списком коллизий. Был построен lock-free ordered list, который послужил нам основой для lock-free hash map.
К сожалению, списки характеризуются линейной сложностью поиска O(N) , где N - число элементов в списке, так что наш алгоритм lock-free ordered list сам по себе представляет небольшой интерес при больших N .
Или все же представляет?..

Существует довольно любопытная структура данных - skip-list , основанная на обычном односвязном списке. Впервые она была описана в 1990 году W.Pugh, перевод его оригинальной статьи есть на хабре. Эта структура представляет для нас интерес, так как, имея алгоритм lock-free ordered list, довольно легко реализовать lock-free skip-list.
Для начала - немного о принципах построения skip-list. Skip-list представляет собой многоуровневый список: каждый элемент (иногда называемый башней, tower) имеет некоторую высоту h .


Высота h выбирается случайным образом из диапазона , где Hmax - максимально возможная высота, обычно 16 или 32. Вероятность того, что высота башни равна единице, есть P = 1/2 . Вероятность высоты k есть:

Несмотря на кажущуюся сложность вычисления высоты, её можно рассчитать довольно просто, например, так: h = lsb(rand()) , где lsb - номер младшего значащего бита числа.

Magic 1/2

На самом деле, константа 1/2 - это мое упрощение, в оригинальной статье идет речь о 0 < p < 1 и исследуется поведение skip-list при различных значениях p . Но для практической реализации p = 1/2 - наилучшее значение, мне кажется.


Получается, что чем больше уровень, тем более список на этом уровне разрежен по сравнению с нижележащими. Эта разреженность наряду с вероятностной природой дает оценку сложности поиска O(log N) , - такую же, как для бинарных самобалансирующихся деревьев.
Поиск в skip-list"е довольно прост: начиная с head-башни, которая имеет максимальную высоту, проходим по самому верхнему уровню, пока не найдем элемент с ключом больше искомого (или не упремся в хвост tail), спускаемся на уровень ниже, ищем аналогично в нем и т.д., пока не попадем на уровень 0, самый нижний; пример поиска ключа 34:

Lock-free skip list

Для построения lock-free skip-list у нас уже есть lock-free алгоритм для каждого уровня. Осталось разработать приемы работы с уровнями. Казалось бы, невозможно атомарно вставить узел-башню высотой, скажем, 20, - ведь для этого нужно атомарно поменять 20 указателей! Оказывается, этого и не нужно, достаточно уметь атомарно менять один указатель, - то, что мы уже умеем делать в lock-free list.
Рассмотрим, как происходит вставка в lock-free skip-list. Будем вставлять элемент-башню высотой 5 с ключом 23. Первым этапом мы ищем позицию вставки, двигаясь по уровням сверху вниз. В результате у нас получается массив prev позиций вставки на каждом уровне:


Далее, вставляем новый элемент на уровень 0, самый нижний. Вставку в lock-free список мы уже умеем делать:


Всё, - элемент становится частью списка, его можно найти, он становится достижимым , несмотря на то, что вся башня целиком ещё не вставлена в skip-list.
Далее мы не торопясь вставляем нашу башню на уровни выше, снизу вверх:


Эти вставки имеют уже второстепенное значение, призваны повысить эффективность поиска, но никак не влияют на принципиальную достижимость нового ключа.
Удаление элемента происходит в две фазы: сначала мы находим удаляемый элемент и помечаем его как логически удаленный, используя прием marked pointer. Сложность в том, что для башни мы должны пометить все уровни элемента, начиная с самого верхнего:


После того, как все уровни элемента-башни помечены, делается физическое удаление (точнее говоря, исключение элемента из списка, так как удаление памяти под элемент выполняется Hazard Pointer или RCU), также сверху вниз:


На каждом уровне применяется алгоритм вставки/удаления из обычного lock-free ordered list, который мы уже рассматривали.

Как видим, процедуры вставки/удаления из lock-free skip-list многошаговые, на каждом шаге возможна интерференция с конкурирующими операциями, поэтому при программировании skip-list нужна особая аккуратность. Например, при вставке мы сначала ищем позиции в списках на всех уровнях и формируем массивы prev . Вполне возможно, что в процессе вставки список на каком-то уровне изменится и эти позиции станут невалидными. В этом случае следует обновить prev , то есть найти вставляемый элемент, и продолжить вставку, начиная с уровня, на котором произошел облом.
Более интересна ситуация, когда происходит одновременная вставка ключа K и его удаление. Такое вполне возможно: вставка считается успешно выполненной, когда мы связали элемент на уровне 0 списка. После этого элемент уже достижим и его вполне можно удалить, несмотря на то, что он ещё не до конца вставлен в верхние уровни. Для разрешения коллизии вставки и удаления очень важен порядок: вставка производится снизу вверх, а удаление (точнее, его первая фаза - логическое удаление, marked pointer) - противоходом, сверху вниз. В таком случае процедура вставки обязательно увидит на каком-либо уровне метку удаления и немедленно прекратит свою работу.
Процедура удаления также может быть конкурентной на обоих своих фазах. На фазе логического удаления, когда проставляются метки на башне сверху вниз, нам конкуренция не страшна. А вот на фазе исключения удаляемого элемента из списков любое изменение skip-list"а, то есть нарушение массивов prev и found , определяющих позицию удаляемого элемента, приводит к тому, что нам надо эти массивы сформировать заново. Но метки уже проставлены и функция поиска просто не найдет удаляемый элемент! Для разрешения этой ситуации мы наделяем функцию поиска несвойственной ей работой: при обнаружении помеченного на каком-либо уровне элемента функция поиска исключает (unlink) этот элемент из списка этого уровня, то есть помогает удалять элементы. После исключения функция возобновляет поиск с самого начала, то есть с самого верхнего уровня (здесь возможны вариации, но самое простое - начать с начала). Это типичный пример взаимопомощи, часто встречающийся в lock-free программировании: один поток помогает другому делать его работу. Именно поэтому функция find() во многих lock-free контейнерах не является константной - поиск может изменить контейнер.
Чем ещё характеризуется skip-list? Во-первых, это упорядоченный map, в отличие от hash map, то есть поддерживает операции извлечения минимального extract_min() и максимального extract_max() ключей.
Во-вторых, skip-list прожорлив для схемы Hazard Pointrer (HP): при максимальной высоте башни 32 элементы массивов prev и found , определяющих искомую позицию, должны быть защищены hazard pointer"ами, то есть нам требуется минимум 64 hazard pointer"а (на самом деле, в реализации libcds – 66). Это довольно много для схемы HP, см. подробнее её устройство . Для схемы RCU некоторую сложность представляет реализация метода find() , так как этот метод может удалять элементы, а схема RCU требует, чтобы удаление исключение (unlink) элемента из списка было под критической секцией RCU, а удаление деаллокация памяти - вне критической секции, иначе возможен deadlock.
Интересную практическую задачу представляет собой реализация башен для высоты более 1. Сейчас в реализации skip-list в библиотеке libcds память под башни высотой более 1 распределяется отдельно от памяти под элемент даже для интрузивного варианта. Учитывая вероятностную природу высоты, получается, что для 50% элементов делается распределение памяти, - это может сказаться на производительности, а также негативно влияет на фрагментацию памяти. Есть задумка башни высотой не более h_min распределять одним блоком и только для высоких «дораспределять» память под башню:
template struct tower { tower * next; tower * high_next; // массив для указателей уровней >= Hmin };
Если Hmin = 4 , то при таком построении 93% элементов не потребуют распределения дополнительной памяти под указатели next - high_next для них будет nullptr .

skip-list: библиография


W.Pugh спустя год-два после опубликования своей работы о skip-list представил другую работу, посвященную конкурентной реализации, основанной на аккуратной расстановке блокировок (fine-grained locks).
Наиболее цитируемая работа по lock-free skip-list – это диссертация K.Fraser Practical lock-freedom 2003 года, в которой представлено несколько алгоритмов skip-list как на основе транзакционной памяти, так и на программной реализации MCAS (CAS над несколькими ячейками памяти). Эта работа представляет сейчас, на мой взгляд, чисто теоретический интерес, так как программная эмуляция транзакционной памяти довольно тяжела, равно как и MCAS. Хотя с выходом Haswell реализация транзакционной памяти в железе пошла в массы…
Реализация skip-list в libcds сделана по мотивам неоднократно цитируемой мною монографии The Art of Multiprocessor Programming . «По мотивам» потому, что оригинальный алгоритм приведен для Java и имеет, как мне кажется, несколько ошибок, которые, быть может, были исправлены во втором издании. Или это вовсе не ошибки для Java.
Есть ещё диссертация Фомичева , посвященная вопросу возобновления поиска в случае обнаруженных конфликтов в skip-list. Как я упоминал выше, find() при обнаружении помеченного (marked) элемента пытается удалить его из списка и возобновляет поиcк с самого начала . Фомичев предлагает механизм расстановки back-reference – обратных ссылок, чтобы работу возобновлять с некоторого предыдущего элемента, а не с начала, что, в принципе, должно повлиять на производительность в лучшую сторону. Алгоритм поддержки back-reference в актуальном состоянии довольно сложный и непонятно, будет ли выигрыш или же его съест код поддержки актуальности.

В следующей статье попытаемся рассмотреть другой обширный класс структур данных, являющихся основой для ассоциативных контейнеров, - деревья, точнее, их конкурентную реализацию.

Существует довольно распространённое мнение, что выполнение различных тестовых заданий помогает очень быстро поднять свой профессиональный уровень. Я и сам люблю иногда откопать какое-нить мудреное тестовое и порешать его, чтобы быть постоянно в тонусе, как говорится. Как-то я выполнял конкурсное задание на стажировку в одну компанию, задачка показалась мне забавной и интересной, вот её краткий текст:
Представьте, что ваш коллега-нытик пришел рассказать о своей непростой задаче - ему нужно не просто упорядочить по возрастанию набор целых чисел, а выдать все элементы упорядоченного набора с L-го по R-й включительно!
Вы заявили, что это элементарная задача и, чтобы написать решение на языке C#, вам нужно десять минут. Ну, или час. Или два. Или шоколадка «Алёнка»

Предполагается, что в наборе допускаются дубликаты, и количество элементов будет не больше, чем 10^6.

К оценке решения есть несколько комментариев:

Ваш код будут оценивать и тестировать три программиста:
  • Билл будет запускать ваше решение на тестах размером не больше 10Кб.
  • В тестах Стивена количество запросов будет не больше 10^5, при этом количество запросов на добавление будет не больше 100.
  • В тестах Марка количество запросов будет не больше 10^5.
Решение может быть очень интересным, поэтому я посчитал нужным его описать.

Решение

Пусть у нас есть абстрактное хранилище.

Обозначим Add(e) - добавление элемента в хранилище, а Range(l, r) - взятие среза с l по r элемент.

Тривиальный вариант хранилища может быть таким:

  • Основой хранилища будет упорядоченный по возрастанию динамический массив.
  • При каждой вставке бинарным поиском находится позицию, в которую необходимо вставить элемент.
  • При запросе Range(l, r) будем просто брать срез массива из l по r.
Оценим сложность подхода, основанного на динамическом массиве.

C Range(l, r) - взятие среза можно оценить, как O(r - l).

C Add(e) - вставка в худшем случае будет работать за O(n), где n - количество элементов. При n ~ 10^6, вставка - узкое место. Ниже в статье будет предложен улучшенный вариант хранилища.

Пример исходного кода можно посмотреть .

Skiplist

Skiplist - это рандомизированная альтернатива деревьям поиска, в основе которой лежит несколько связных списков. Была изобретена William Pugh в 1989 году. Операции поиска, вставки и удаления выполняются за логарифмически случайное время.

Эта структура данных не получила широкой известности (кстати, и на хабре достаточно обзорно о ней написано), хотя у нее хорошие асимптотические оценки. Любопытства ради захотелось ее реализовать, тем более имелась подходящая задача.

Пусть у нас есть отсортированный односвязный список:

В худшем случае поиск выполняется за O(n). Как его можно ускорить?

В одной из видео-лекций, которые я пересматривал, когда занимался задачкой, приводился замечательный пример про экспресс-линии в Нью-Йорке:

  • Скоростные линии соединяют некоторые станции
  • Обычные линии соединяют все станции
  • Есть обычные линии между общими станциями скоростных линий
Идея в следующем: мы можем добавить некоторое количество уровней с некоторым количеством узлов в них, при этом узлы в уровнях должны быть равномерно распределены. Рассмотрим пример, когда на каждом из вышележащих уровней располагается половина элементов из нижележащего уровня:


На примере изображен идеальный SkipList, в реальности всё выглядит подобным образом, но немного не так:)

Поиск

Так происходит поиск. Предположим, что мы ищем 72-й элемент:

Вставка

Со вставкой все немного сложнее. Для того чтобы вставить элемент, нам необходимо понять, куда его вставить в самом нижнем списке и при этом протолкнуть его на какое-то количество вышележащих уровней. Но на какое количество уровней нужно проталкивать каждый конкретный элемент?

Решать это предлагается так: при вставке мы добавляем новый элемент в самый нижний уровень и начинаем подкидывать монетку, пока она выпадает, мы проталкиваем элемент на следующий вышележащий уровень.
Попробуем вставить элемент - 67. Сначала найдем, куда в нижележащем списке его нужно вставить:

Предположил, что монетка выпала два раза подряд. Значит нужно протолкнуть элемент на два уровня вверх:

Доступ по индексу

Для доступа по индексу предлагается сделать следующее: каждый переход пометить количеством узлов, которое лежит под ним:

После того, как мы получаем доступ к i-му элементу (кстати, получаем мы его за O(ln n)), сделать срез не представляется трудным.

Пусть необходимо найти Range(5, 7). Сначала получим элемент по индексу пять:


А теперь Range(5, 7):

О реализации

Кажется естественной реализация, когда узел SkipList выглядит следующим образом:
SkipListNode {
int Element;
SkipListNode Next;
int Width;
}
Собственно, так и сделано .

Но в C# в массиве хранится еще и его длина, а хотелось сделать это за как можно меньшее количество памяти (как оказалось, в условиях задачи все оценивалось не так строго). При этом хотелось сделать так, чтобы реализация SkipList и расширенного RB Tree занимала примерно одинаковое количество памяти.

Ответ, как уменьшить потребление памяти, неожиданно был найден при ближайшем рассмотрении из пакета java.util.concurrent.

Двумерный skiplist

Пусть в одном измерении будет односвязный список из всех элементов. Во втором же будут лежать «экспресс-линии» для переходов со ссылками на нижний уровень.
ListNode {
int Element;
ListNode Next;
}

Lane {
Lane Right;
Lane Down;
ListNode Node;
}

Нечестная монетка

Еще для снижения потребления памяти можно воспользоваться «нечестной» монеткой: уменьшить вероятность проталкивания элемента на следующий уровень. В статье William Pugh рассматривался срез из нескольких значений вероятности проталкивания. При рассмотрении значений ½ и ¼ на практике получилось примерно одинаковое время поиска при уменьшении потребления памяти.

Немного о генерации случайных чисел

Копаясь в потрохах ConcurrentSkipListMap, заметил, что случайные числа генерируются следующим образом:
int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x80000001) != 0)
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}
Подробнее о генерации псевдослучайных чисел с помощью XOR можно почитать в этой статьe . Особого увеличения скорости я не заметил, поэтому, не использовал его.

Исходник получившегося хранилища можно глянуть .

Все вместе можно забрать с googlecode.com (проект Pagination).

Тесты

Было использовано три типа хранилища:
  1. ArrayBased (динамический массив)
  2. SkipListBased (SkipList c параметром ¼)
  3. RBTreeBased (красно-черное дерево: реализация моего знакомого, который выполнял аналогичное задание).
Было проведено три вида тестов на вставку 10^6 элементов:
  1. Упорядоченные по возрастанию элементы
  2. Упорядоченные по убыванию элементы
  3. Случайные элементы
Тесты проводились на машине с i5, 8gb ram, ssd и Windows 7 x64.
Результаты:
Array RBTree SkipList
Random 127033 ms 1020 ms 1737 ms
Ordered 108 ms 457 ms 536 ms
Ordered by descending 256337 ms 358 ms 407 ms
Вполне ожидаемые результаты. Видно, что вставка в массив, когда элемент вставляется куда-нибудь, кроме как в конец, работает медленнее всего. При этом SkipList медленнее RBTree.

Также были проведены замеры: сколько каждое хранилище занимает в памяти при вставленных в него 10^6 элементах. Использовался студийный профайлер, для простоты запускался такой код:

var storage = ...
for (var i = 0; i < 1000000; ++i)
storage.Add(i);
Результаты:
Array RBTree SkipList
Total bytes allocated 8,389,066 bytes 24,000,060 bytes 23,985,598 bytes
Ещё вполне ожидаемые результаты: хранилище на динамическом массиве заняло наименьшее количество памяти, а SkipList и RBTree заняли примерно одинаковый объем.

Хеппи-энд с «Алёнкой»

Коллега-нытик, по условию задачи, проспорил мне шоколадку. Моё решение зачли с максимальным баллом. Надеюсь, кому-нибудь данная статья будет полезна. Если есть какие-то вопросы - буду рад ответить.

P.S.: я был на стажировке в компании СКБ Контур. Это чтобы не отвечать на одинаковые вопросы =)

Списки с пропусками - это структура данных, которая может применяться вместо сбалансированных деревьев. Благодаря тому, что алгоритм балансировки вероятностный, а не строгий, вставка и удаление элемента в списках с пропусками реализуется намного проще и значительно быстрее, чем в сбалансированных деревьях.

Списки с пропусками - это вероятностная альтернатива сбалансированным деревьям. Они балансируются с использованием генератора случайных чисел. Несмотря на то, что у списков с пропусками плохая производительность в худшем случае, не существует такой последовательности операций, при которой бы это происходило постоянно (примерно как в алгоритме быстрой сортировки со случайным выбором опорного элемента). Очень маловероятно, что эта структура данных значительно разбалансируется (например, для словаря размером более 250 элементов вероятность того, что поиск займёт в три раза больше ожидаемого времени, меньше одной миллионной).

Балансировать структуру данных вероятностно проще, чем явно обеспечивать баланс. Для многих задач списки пропуска это более естественное представление данных по сравнению с деревьями. Алгоритмы получаются более простыми для реализации и, на практике, более быстрыми по сравнению со сбалансированными деревьями. Кроме того, списки с пропусками очень эффективно используют память. Они могут быть реализованы так, чтобы на один элемент приходился в среднем примерно 1.33 указатель (или даже меньше) и не требуют хранения для каждого элемента дополнительной информации о балансе или приоритете.

Для поиска элемента в связном списке мы должны просмотреть каждый его узел:

Если список хранится отсортированным и каждый второй его узел дополнительно содержит указатель на два узла вперед, нам нужно просмотреть не более, чем ⌈n /2⌉ + 1 узлов(где n - длина списка):

Аналогично, если теперь каждый четвёртый узел содержит указатель на четыре узла вперёд, то потребуется просмотреть не более чем ⌈n /4⌉ + 2 узла:

Если каждый 2 i i узлов вперёд, то количество узлов, которые необходимо просмотреть, сократится до ⌈log 2 n ⌉, а общее количество указателей в структуре лишь удвоится:

Такая структура данных может использоваться для быстрого поиска, но вставка и удаление узлов будут медленными.

Назовём узел, содержащий k указателей на впередистоящие элементы, узлом уровня k . Если каждый 2 i -ый узел содержит указатель на 2 i узлов вперёд, то уровни распределены так: 50% узлов - уровня 1, 25% - уровня 2, 12.5% - уровня 3 и т.д. Но что произойдёт, если уровни узлов будут выбираться случайно, в тех же самых пропорциях? Например, так:

Указатель номер i каждого узла будет ссылаться на следующий узел уровня i или больше, а не на ровно 2 i -1 узлов вперёд, как было до этого. Вставки и удаления потребуют только локальных изменений; уровень узла, выбранный случайно при его вставке, никогда не будет меняться. При неудачном назначении уровней производительность может оказаться низкой, но мы покажем, что такие ситуации редки. Из-за того, что эти структуры данных представляют из себя связные списки с дополнительными указателями для пропуска промежуточных узлов, я называю их списками с пропусками .

Операции

Опишем алгоритмы для поиска, вставки и удаления элементов в словаре, реализованном на списках с пропусками. Операция поиска возвращает значение для заданного ключа или сигнализирует о том, что ключ не найден. Операция вставки связывает ключ с новым значением (и создаёт ключ, если его не было до этого). Операция удаления удаляет ключ. Также в эту структуру данных можно легко добавить дополнительные операции, такие как «поиск минимального ключа» или «нахождение следующего ключа».

Каждый элемент списка представляет из себя узел, уровень которого был выбран случайно при его создании, причём независимо от числа элементов, которые уже находились там. Узел уровня i содержит i указателей на различные элементы впереди, проиндексированные от 1 до i . Мы можем не хранить уровень узла в самом узле. Количество уровней ограничено заранее выбранной константой MaxLevel . Назовём уровнем списка максимальный уровень узла в этом списке (если список пуст, то уровень равен 1). Заголовок списка (на картинках он слева) содержит указатели на уровни с 1 по MaxLevel . Если элементов такого уровня ещё нет, то значение указателя - специальный элемент NIL.

Инициализация

Создадим элемент NIL, ключ которого больше любого ключа, который может когда-либо появиться в списке. Элемент NIL будет завершать все списки с пропусками. Уровень списка равен 1, а все указатели из заголовка ссылаются на NIL.

Поиск элемента

Начиная с указателя наивысшего уровня, двигаемся вперед по указателям до тех пор, пока они ссылаются на элемент, не превосходящий искомый. Затем спускаемся на один уровень ниже и снова двигаемся по тому же правилу. Если мы достигли уровня 1 и не можем идти дальше, то мы находимся как раз перед элементом, который ищем (если он там есть).

Search (list, searchKey)
x:= list→header
# инвариант цикла: x→key < searchKey
for i:= list→level downto 1 do
while x→forward[i]→key < searchKey do
x:= x→forward[i]
# x→key < searchKey ≤ x→forward→key
x:= x→forward
if x→key = searchKey then return x→value
else return failure

Вставка и удаление элемента

Для вставки или удаления узла применяем алгоритм поиска для нахождения всех элементов перед вставляемым (или удаляемым), затем обновляем соответствующие указатели:


В данном примере мы вставили элемент уровня 2.

Insert (list, searchKey, newValue)
local update
x:= list→header
for i:= list→level downto 1 do
while x→forward[i]→key < searchKey do
x:= x→forward[i]
# x→key < searchKey ≤ x→forward[i]→key
update[i] := x
x:= x→forward
if x→key = searchKey then x→value:= newValue
else
lvl:= randomLevel()
if lvl > list→level then
for i:= list→level + 1 to lvl do
update[i] := list→header
list→level:= lvl
x:= makeNode(lvl, searchKey, value)
for i:= 1 to level do
x→forward[i] := update[i]→forward[i]
update[i]→forward[i] := x

Delete (list, searchKey)
local update
x:= list→header
for i:= list→level downto 1 do
while x→forward[i]→key < searchKey do
x:= x→forward[i]
update[i] := x
x:= x→forward
if x→key = searchKey then
for i:= 1 to list→level do
if update[i]→forward[i] ≠ x then break
update[i]→forward[i] := x→forward[i]
free(x)
while list→level > 1 and list→header→forward = NIL do
list→level:= list→level – 1

Для запоминания элементов перед вставляемым(или удаляемым) используется массив update . Элемент update[i] - это указатель на самый правый узел, уровня i или выше, из числа находящихся слева от места обновления.

Если случайно выбранный уровень вставляемого узла оказался больше, чем уровень всего списка (т.е. если узлов с таким уровнем ещё не было), увеличиваем уровень списка и инициализируем соответствующие элементы массива update указателями на заголовок. После каждого удаления проверяем, удалили ли мы узел с максимальным уровнем и, если это так, уменьшаем уровень списка.

Генерация номера уровня

Ранее мы приводили распределение уровней узлов в случае, когда половина узлов, содержащих указатель уровня i , также содержали указатель на узел уровня i +1. Чтобы избавиться от магической константы 1/2, обозначим за p долю узлов уровня i , содержащих указатель на узлы уровня i +i. Номер уровня для новой вершины генерируется случайно по следующему алгоритму:

randomLevel ()
lvl:= 1
# random() возвращает случайное число в полуинтервале }