Мащабно матрично умножение с pyspark (или - как да се съпоставят два големи набора от данни с имена на компании)

Spark и pyspark имат прекрасна поддръжка за надеждно разпределение и паралелизация на програмите, както и поддръжка за много основни алгебрични операции и алгоритми за машинно обучение.

В тази публикация описваме мотивацията и средствата за извършване на съвпадение на име по име на два големи набора от данни с имена на компании с помощта на Spark.

Мотивация на първо място

Нашата цел е да съпоставим два големи набора от имена на компании. Разглеждаме два дълги списъка с имена на фирми, списък А и списък Б и се стремим да съпоставим компании от А до компании от Б.

Обикновено ще имаме нещо подобно:

Списък А | Списък Б
---------------------
GOOGLE INC. | Google
MEDIUM.COM | Среден Инк
Амазонски лаборатории | Амазонка
Google, вкл. |
Yahoo |
             | Microsoft

В този пример нашата цел е да съпоставим както GOOGLE INC., Така и Google, вкл. (От списък А) до Google (от списък Б); и да съответства на MEDIUM.COM на Medium Inc; и лаборатории на Amazon до Amazon и т.н. ...

Разглеждайки този прост пример, няколко неща се открояват:

  • Възможно е повече от една компания от A да се сравни с една компания от B; това е връзка много към един
  • Имената на компании в повечето случаи са лесни за съпоставяне на хора (напр. Лаборатории на Amazon до Amazon), но не са толкова лесни за компютрите (как компютърът знае, че „лабораториите“ в този случай са незначителни и че „Amazon“ е просто съкратено за „Amazon labs“?)
  • Не всички компании от А имат съвпадения в Б, а не всички компании от Б са съвпадащи с А. В нашия пример Yahoo от списък А не е съвпаднал с никоя друга компания в Б, а Microsoft от Б не е съпоставен с нито една компания от А ,
  • Всеки елемент от списък А трябва да има най-много едно съвпадение от B. Обратното не е вярно, както е отбелязано, много компании от А могат да бъдат съпоставени в една компания на B.

Първи опит - тривиален мач

Добре, отначало решихме, че ще опитаме най-простото и тривиално решение, за да видим колко добре работи, ако не за друго, най-малкото, за да създадем основна линия за бъдещи опити. Най-простото нещо е просто нечувствителен към регистъра тест за низово уравнение. Просто съпоставете струните от A до струните от B.

Прецизност и припомняне

Трябва да се разгледат две подходящи мерки: Прецизност и припомняне. Прецизността е „колко грешки направихме (не) направихме“, или с други думи - като се имат предвид всички мачове, колко от тях наистина са правилни съвпадения - точни съвпадения. Така че точността се отнася до фалшивите положителни резултати.

Припомняме, от друга страна е „колко мача е трябвало да бъдат намерени, но са пропуснати“. Така че припомнянето е за лъжливите негативи.

Първият тривиален опит беше, както се очакваше, с висока точност, но с ниско припомняне. Ако разгледаме кратък списък от примерни компании, той би съвпаднал с нулеви елементи от А до Б. Това е много лошо припомняне, но 100% точност ;-). Разбира се, в реалния сценарий това ще съвпадне малко повече от нула, но е лесно да се види, че поради много малки възможни вариации при извикването на имена на компании ще остане ниско.

Просто подобрение на това би било премахването на стоп-думи.

Спрете думите

Какво са стоп-думи? Стоп думите са думи, които се премахват преди обработването на различни NLP алгоритми, защото не добавят информация, обикновено те просто добавят шум. В обикновените английски стоп-думи обикновено са "на", "за" "ако" и т.н. на езика, това са много често срещани думи, които се използват много, но за много NLP и IR алгоритмите не добавят информация. Те правят правилни синтактични изречения и в много случаи засягат семантиката, но на ниво много процесори NLP, които не гледат на действителния синтаксис, те са безсмислени.

В нашия случай стоп-думите не са „ако“, „на“ или „за“, които са типични за английски, но са „inc“ и „llc“ от разширението на компанията. Така че нашето просто подобрение е просто да премахнете всички тези разширения на компанията и да опитате простото низово уравнение отново.

Това наистина помогна и както можете да видите в нашия пример, това помогна за съвпадение на „Google вкл.“ С „Google“ и с просто премахване на препинателни знаци и правилна токенизация, ние също съвпадаме с „Google, вкл.“ С „Google“. Но това все още не съвпада „Amazon labs“ с „Amazon“ b / c „labs“ не е стоп-дума в смисъл, че не е обичайно разширение на компанията. Както се оказва, „лаборатории на Amazon“ не е само случаен пример, много имена на компании имат тези вариации в имената си, които се проявяват в един набор от данни, но не се проявяват в други набори от данни. Извод: трябва да намерим начин да „погледнем отвъд това“, да игнорираме „лабораториите“ в „лаборатории на Амазонка“.

Нека се срещнем с науката.

Науката

Това, което разглеждаме тук, е проблемът за съвпадение на N документи от списък А до М документи от списък Б, в много отношения към един. Но нашият съвпадащ алгоритъм трябва да бъде „умен“, в смисъл, че трябва да може да прави разлика между „важни думи“ и „не важни думи“. Трябва да намерим начин да кажем на компютъра, че „лабораториите“ в „лаборатории на амазон“ са незначителни, но „амазон“ наистина е значителен. Ние бихме тривиално токенизирали имената на по-малки символи, като се разделим на интервали, препинателни знаци и т.н., така че „medium.com“ да се раздели на „medium“ и „com“.

Наука за спасяване!

TF-IDF

За тази цел използваме обща схема в теорията за извличане на информация, наречена TF-IDF. TF-IDF означава Term Frequency - обърната честота на документа. Терминът Честота означава просто „колко пъти тази дума се появява в този документ“ (нашите документи са само имена на компании, така че те са много кратки „документи“). Така че в случая с „Amazon labs“ имаме само две думи в документа „amazon“ и „labs“ и тяхната честота е просто 1 и 1. (Ако между другото името на компанията просто се е случило като „amazon labs“ amazon “, тогава броят щеше да бъде 2 за„ amazon “и 1 за„ лаборатории “.) За това е TF, съвсем просто: пребройте честотата на термините в документа.

Обърнатата честота на документа е истинската сделка. Обърнатата честота на документите разглежда всички „документи“ (известен още като корпус, всички имена на компании) и тества колко често думата „лаборатории“ се появява във всички тях. Ако думата „amazon“ се появява само в един документ, това означава, че „amazon“ е значима дума, но ако думата „labs“ се появява в много други документи (напр. Много компании използват думата „labs“ като част от своите име) тогава това означава, че думата „лаборатории“ е незначителна. IDF е точно това - в колко документи се появява думата.

TF-IDF е TF на термина, разделен на IDF на термина. Той осигурява добро измерване на това колко важни или колко важни са думите в контекста на конкретни документи.

Лесно е да се изчисли TF-IDF матрицата за набор от документи. Има готови библиотеки, които правят това, и ние използвахме прилагането на scikit-learn за това.

TF-IDF матрицата е двуизмерна матрица, в която редовете представляват документи (в нашия случай - имена на компании), а колоните представляват уникални символи (или думи). Ако искахме да изградим TF-IDF матрицата на нашия малък корпус от списък А, това ще изглежда нещо подобно (след премахването на стоп-думи, препинателни знаци и понижаване на всичко):

           | google | средно | com | yahoo | амазон | лаборатории
-------------------------------------------------- ---------
GOOGLE INC. | 1 0 0 0 0 0
MEDIUM.COM | 0 .77 .63 0 0 0
Амазонски лаборатории | 0 0 0 0 .7 .7
Google, вкл. | 1 0 0 0 0 0
Yahoo | 0 0 0 1 0 0
com | 0 0 1 0 0 0

Ето кода:

от sklearn.feature_extraction.text import TfidfVectorizer
matrix = vectorizer.fit_transform ([„GOOGLE“, „MEDIUM.COM“, „лаборатории на Amazon“, „Google“, „Yahoo“, „com“])

Матрицата, която беше създадена, е NxM, където N = брой компании и M = брой уникални маркери.

Ще забележите, че добавихме друга (съставена) компания, наречена „com“. Направихме това, за да демонстрираме важно свойство на TF-IDF. Използваме TF-IDF, за да различаваме значимите и незначителните символи в документите. Значителен знак в документ е означение, което не само се появява в документа често, но също е сравнително рядко в целия корпус. Ако терминът се появява многократно в корпуса, той става по-малко значим за този конкретен документ. Добавихме съставената компания „com“, така че „Medium“ в рамките на „Medium.com“ става по-значим. (Вие ще забележите, че „средното“ тегло е .77, докато „com“ тегло е .63, и това се дължи на появата на „com“ в друг документ, следователно IDF е по-нисък).

Разбира се, в реалния свят трябва да имате десетки или стотици имена на компании с обозначението „com“ или „labs“, така че да видите съществена разлика между „Medium“ и „com“ в името Medium.com.

Прилика на козина

Следващата стъпка след изчисляване на матрицата TF-IDF за двете страни (и двата списъка A и B на компаниите) е умножаването на матриците.

Умножаването на матриците осигурява интересна мярка, наречена сходство на косинусите. Приликата с косинус е просто измерване на сходството, което варира между 0 и 1. Стойност 1 показва идентични елементи, а венома от 0 показва напълно различни елементи (точно както прави косинусната триъгълна функция). Умножаването на матриците осигурява косинусното сходство между всеки елемент от списък А на всеки елемент от списък Б. В действителност ние умножаваме A по B.T (B.transpose), така че размерите да се поберат. Интересното за косинусното сходство между TF-IDF матриците е, че резултатът е матрица на сходствата между всеки елемент в A с всеки елемент в B, като същевременно се отчита значението на маркери в имената. Обикновено резултат от> .8 означава валиден мач.

За щастие пакетът python sklearn осигурява проста функция на косинус-сходство, която приема две матрици и води до косинусното сходство на тези две. Ето някои демо код:

от sklearn.feature_extraction.text import TfidfVectorizer
от sklearn.metrics.pairwise импортира cosine_s подобниity
a = vectorizer.fit_transform (['aa', 'bb', 'aa bb', 'aa aa bb'])
b = vectorizer.fit_transform (['aa', 'bb'])
cosparentities = cosine_s подобниity (a, b)

Резултатът е матрица на приликите между всеки елемент в A с всеки елемент в b.

           аа | бб
----------------------
аа | 1 | 0
bb | 0 | 1
aa bb | .7 | 0.7
aa aa bb | .89 | 0.44

Както се очаква, думата „aa“ от a е много подобна на думата „aa“ от b (забележете 1). Можете също така да видите, че „aa bb“ е еднакво подобен на „aa“ и „bb“ и това също има смисъл. И накрая ще забележите, че „aa aa bb“ има по-голямо сходство с „aa“, отколкото при „bb“. Всичко това има смисъл.

Нека обобщим науката тук. Първо вземаме два списъка с документи и за всеки набор изчисляваме, че е TF-IDF матрица *. След това умножаваме двете матрици, за да излезе с косинусното им сходство, което е матрица, която описва сходството между всеки документ в А с всеки документ в Б.

Втори опит - умножено матрично умножение

След като видяхме науката, искаме да опитаме това на практика. Следващият опит е да заредите всички реални компании от списък А и да заредите всички реални компании от списък Б и да умножите дяволите от тях, използвайки функцията cosine_similatiry.

По-лесно казано, отколкото направено.

В малък мащаб това просто работи и работи много добре. Например с няколко хиляди компании от всяка страна, които биха работили. Въпреки това с нашата база данни, в която имаме няколко стотин хиляди имена във всеки списък, до няколко милиона, това става предизвикателство.

Простото изчисляване на TF-IDF е възможно, дори и при толкова големи набори от данни, на един хост (моя лаптоп работи толкова лесно, за няколко секунди). Умножаването на матриците обаче е мястото, където е истинското предизвикателство.

Локалното умножение не мащабира

Да приемем, че имаме 1M (10⁶) имена във всеки списък. Всяка матрица би била приблизително от порядъка на 10⁶ x 10⁶ (тъй като броят на уникалните символи е сходен с броя на компаниите поради уникалността на имената). Създаването на 10⁶ x 10⁶ матрица в паметта означава 10¹² плава. Питон поплавък отнема 16 байта, така че завършваме с 16 * 10¹² байта, което се превежда на ~ 4PT (четири петабайта) RAM. Можем ли да запазим 4PT матрица в паметта? Е, разбира се, че не, поне не на изтъркания ми лаптоп. Но - има един трик. Не е нужно да запазваме всичко това в паметта. Имайте предвид, че въпреки че матрицата е 1M на 1M, на практика тя е предимно запълнена с нули. И така, защо да си правим труда да поддържаме всички тези нули в паметта? Вместо това можем да използваме оскъдно представяне на матрицата, в което вместо да запазим двумерната матрица в паметта с помощта на масиви от масиви, можем вместо това да следим само координатите на ненулевите елементи и да приемем, че всички останали са просто нули. Това е чудесно за запазване на ниския отпечатък на паметта, както и за извършване на бързи операции за умножение на матрицата. В интерес на истината, това е точно това, което sklearn вече прави. Запазването на матриците като оскъдни матрици означава, че трябва да разпределяме само около 1M флоата (стойности), плюс 2M цели числа (индекси на нулеви елементи), но всичко това е около 24Mbytes, което е доста лесно.

Но - умножаването на двете матрици, дори и да са оскъдни, би означавало най-малко 10¹² операции (ако сме умни с нулите). Сега това е малко по-трудно. И въпреки че numpy (който се крие под sklearn) е много добър при толкова бърза математика, това нещо е малко предизвикателно дори за numpy.

Опитахме това - просто умножаваме тези две матрици. Работи добре за достатъчно малък размер на матрицата, но при някои числа (които са много по-малки от желаните от нас) започна да се проваля и да изчерпа паметта. Сега бихме могли да работим с това, като разделим една от матриците на по-малки парчета и пускаме число или умножение една след друга и след това обобщаваме всички неща. Но този вид ни напомни, че вече знаем тази система, която прави това, тя се нарича Spark.

Трети опит - искрено умножение на матрицата

Spark е чудесен за силно паралелизирани изчисления с интензивна памет и ето, той има тип данни BlockMatrix, който осъществява операция за мултиплициране. Изглежда точно това, което търсихме! Добре, така че създаваме TF-IDF матрици и ги преобразуваме в BlockMatrix на Spark и изпълняваме a.multiply (b.transpose ()), което е повече или по-малко от това, което прави cosine_similatiry.

# Псевдокод ...
a_mat = tfidf_vect.fit_transform ([..., ..., ...])
b_mat = tfidf_vect.fit_transform ([..., ..., ...])
a_block_mat = create_block_matrix (a)
b_block_mat_tr = create_block_matrix (b.transpose ())
cos подобниities = a_block_mat.multiply (b_block_mat_tr)

Това изглежда достатъчно лесно и наистина е така. Но има „но“ разбира се… това нещо, макар и просто и работи правилно от математическа гледна точка - добре, че не мащабира… Ние сме в състояние да умножим големи матрици, но не толкова големи, колкото бихме искали , Опитахме да играем с размера на блока и т.н. уви, за достатъчно големи входове той не успява или с грешки извън паметта, или просто с дълги изпълнения, които никога не свършват (часове и часове).

Какъв е проблема? Не можете да запалите мащаба?

Разбира се искрата може да се мащабира. Но трябва да го използвате разумно, глупаво ... Проблемът с BlockMatrix е, че за да осъществи операцията за умножение Spark преобразува оскъдните блокове на матрицата в плътни (под) матрици. И въпреки че по-голямата част от нашата матрица е нули, искрата все пак ще преобразува всички тези нули в плътно представяне, което или ще консумира твърде много памет, или ако поддържаме размера на блоковете малък, би довело до прекалено много операции, прераздели и т.н. и би стартирало завинаги.

Spark поддържа оскъдни матрици, но тези матрици не осъществяват операцията за умножение (известна още като точка), а единствената разпределена матрица, която изпълнява операцията за умножение към момента на писане, е BlockMatrix, който, както бе отбелязано, превръща оскъдното представяне в плътно представяне, преди да ги умножим. Трябва да отбележим, че в искровата общност е имало дискусии за начините за прилагане на разпределено размножено матрично умножение, обаче, както беше отбелязано - към момента на писането това все още не е приложено.

BlockMatrix.multiply () не бе успешно. Какво следва?

Четвърти опит - и победителят е ...

Нашият четвърти и последен опит беше успешен. Идеята е да се смесват и да се съпоставят Spark с numpy. Нашите тестове показват, че numpy е в състояние да умножи по-малка матрица с по-голяма матрица, така че ако вземем само малка част от матрица A и умножим тази по матрица B, това ще работи и numpy няма да избухне. И ако си спомняте уроците си по алгебра, умножаването на матриците може да се прави вектор по вектор, така че да е алгебрично-мъдро, че все пак би било правилно. Идеята е да разделите само една от матриците на по-малки парчета и след това всеки работник на Spark да задейства умножението на парчето си и след това да върне само заключението, напр. заключението може да бъде, че името в A [13] съвпада с името в B [21] и т.н.

Излъчване и паралелизиране за спасяването

Spark има две полезни възможности: излъчване и паралелизиране. Предаването просто излъчва същите данни на всички работници. Ние използваме излъчване, за да изпратим матрица B на всички работници, така че всички работници да имат пълната B матрица. Parallelize отрязва данните в дялове и изпраща всеки дял на различен работник. Използваме паралелно, за да изпращаме парчета от A на работниците, така че всеки работник да има всички B, но само малка част от A.

Ето общия контур:

  1. Изчислете TF-IDF матрици на водача.
  2. Паралелизиране на матрица А; Излъчваща матрица B
  3. Всеки работник сега изравнява своята част от работата, като умножава парчето си от матрица A с цялата матрица B. Така че, ако работникът оперира на A [0:99], той ще умножи тези сто реда и ще върне резултата от, да речем, A [13 ] съвпада с име, намерено в B [21]. Умножението става с помощта на numpy.
  4. Водачът ще събере всички резултати от различните работници и ще съпостави индексите (A [13] и B [21]) с действителните имена в оригиналния набор от данни - и ние сме готови!

Този метод работи много добре и всъщност, когато се проведе за първи път, беше толкова приятна изненада, че решихме, че просто не работи (но се получи…). В сравнение с предишните методи, които или се изпълниха с часове (и не завършиха), или изчерпаха паметта си, или този памет, този метод беше в състояние да завърши изчисленията си за няколко минути. Разбира се, това зависи от размера на данните и размера на клъстера на Spark, но като цяло той се справи много добре.

В момента единственото тясно място е шофьорът, изчисляващ TF-IDF матрици, а на тази предна част все още имаме тонове лакът пространство, защото това изчисление все още е доста лесно за sklearn. (странична забележка: Spark също реализира разпределено TF-IDF изчисление, но не е необходимо да го използваме).

Ето псевдокод, който илюстрира нашето решение:

от sklearn.feature_extraction.text import TfidfVectorizer
от sklearn.feature_extraction.text import CountVectorizer
от sklearn.metrics.pairwise импортира cosine_s подобниity
# те реалистично ще бъдат прочетени от файлове или рамки от данни.
a = ['google inc', 'medium.com', ...]
b = ['google', 'microsoft', ...]
stopwords = ['ltd', ...]
vect = CountVectorizer (stop_words = stopwords)
# това може да се направи с по-малко оперативна памет с помощта на генератор
лексика = vect.fit (a + b) .vocabulary_
tfidf_vect = TfidfVectorizer (stop_words = stopwords,
                             речник = речник)
a_mat = tfidf_vect.fit_transform (a)
b_mat = tfidf_vect.fit_transform (b)
a_mat_para = paralelize_matrix (a_mat, rows_per_chunk = 100)
b_mat_dist = излъчване_матрица (a_mat)
a_mat_para.flatMap (
        ламбда подматрица:
        find_matches_in_submatrix (csr_matrix (подматрица [1],
                                             форма = подматрица [2]),
                                   b_mat_dist,
                                   подматрица [0]))
def find_matches_in_submatrix (източници, цели, inputs_start_index,
                              праг = 0.8):
    cosparentities = cosine_s подобниity (източници, цели)
    за i, сходство в изброяване (съпоставимост):
        cosparentity = cosparentity.flatten ()
        # Намерете най-доброто съвпадение, като използвате argsort () [- 1]
        target_index = cos подобниity.argsort () [- 1]
        source_index = inputs_start_index + i
        прилика = съпоставимост [target_index]
        ако подобие [target_index]> = праг:
            доходност (source_index, target_index, сходство)
дефиниране_матрица (мат):
    bcast = sc.broadcast ((mat.data, mat.indices, mat.indptr))
    (данни, индекси, indptr) = bcast.value
    bcast_mat = csr_matrix ((данни, индекси, indptr), форма = mat.shape)
    връщане bcast_mat
деф paralelize_matrix (scipy_mat, rows_per_chunk = 100):
    [редове, cols] = scipy_mat.shape
    i = 0
    подматрици = []
    докато аз <редове:
        current_chunk_size = min (rows_per_chunk, редове - i)
        submat = scipy_mat [i: i + current_chunk_size]
        submatrices.append ((i, (submat.data, submat.indices,
                                submat.indptr),
                            (current_chunk_size, cols)))
        i + = current_chunk_size
    връщане sc.parallelize (подматрици)

Ще забележите, че след излъчването и паралела, ние отново сглобяваме матрицата в scipy csr_matrix, от което произлиза. Така че това, което всъщност правим, е - ние сериализираме матриците над телта и след това ги събираме от другата страна, върху работниците. Сериализацията е ефективна, тъй като трябва само да изпратим ненулевите елементи на оскъдната матрица. Така че за матрица от 1M елементи изпращаме само около 1M поплавъка, заедно с 2M ints, което определено е в зоната на комфорт на Spark.

заключение

Описваме метод за намиране на сходство между два списъка от низове A и B, които описват имената на компании. Използвахме TF-IDF и косинусоподобността като фактор на сходство.

След това показваме различни опити за мащабируемо изпълнение на матрично умножение с помощта на искра и печеливш метод, който комбинира числено матрично умножение заедно с излъчващите и паралелни възможности на искрата.

* Една фина точка, която трябва да се спомене: речникът на двете матрици трябва да е еднакъв. С други думи броят на редовете в двете матрици трябва да е равен и те трябва да имат точно еднакъв ред, напр. всеки ред представлява термин и редът на редовете трябва да е абсолютно еднакъв между матрица A и матрица B. Това става лесно, като първо се изчисли речникът и едва след това се изчисли TF-IDF, както в следния пример:

от sklearn.feature_extraction.text import TfidfVectorizer
от sklearn.feature_extraction.text import CountVectorizer
от sklearn.metrics.pairwise импортира cosine_s подобниity
a = ['google inc', 'medium.com']
b = ['google', 'microsoft']
company_name_stopwords =rozenset (['ltd', 'llc', 'inc'])
vect = CountVectorizer (stop_words = company_name_stopwords)
лексика = vect.fit (a + b) .vocabulary_
tfidf_vect = TfidfVectorizer (stop_words = company_name_stopwords,
                             речник = речник)
a_mat = tfidf_vect.fit_transform (a)
b_mat = tfidf_vect.fit_transform (b)
cos подобниities = cosine_s подобниity (a_mat, b_mat)