PostgreSQL
 sql >> डेटाबेस >  >> RDS >> PostgreSQL

अपाचे स्पार्क के साथ प्राथमिक कुंजी

स्कैला :

यदि आपको केवल विशिष्ट संख्याओं की आवश्यकता है तो आप zipWithUniqueId . का उपयोग कर सकते हैं और डेटाफ़्रेम को फिर से बनाएँ। पहले कुछ आयात और डमी डेटा:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

आगे उपयोग के लिए स्कीमा निकालें:

val schema = df.schema

आईडी फ़ील्ड जोड़ें:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

डेटाफ़्रेम बनाएँ:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

पायथन . में वही बात :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

यदि आप लगातार संख्या पसंद करते हैं तो आप zipWithUniqueId . को बदल सकते हैं zipWithIndex . के साथ लेकिन यह थोड़ा अधिक महंगा है।

सीधे DataFrame के साथ एपीआई :

(सार्वभौमिक स्कैला, पायथन, जावा, आर काफी हद तक समान वाक्यविन्यास के साथ)

पहले मैं monotonicallyIncreasingId . से चूक गया था फ़ंक्शन जो तब तक ठीक काम करना चाहिए जब तक आपको लगातार संख्याओं की आवश्यकता न हो:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

जबकि उपयोगी monotonicallyIncreasingId गैर नियतात्मक है। न केवल आईडी निष्पादन से निष्पादन में भिन्न हो सकते हैं बल्कि अतिरिक्त ट्रिक्स के बिना पंक्तियों की पहचान करने के लिए उपयोग नहीं किया जा सकता है जब बाद के संचालन में फ़िल्टर होते हैं।

नोट :

rowNumber . का उपयोग करना भी संभव है विंडो फ़ंक्शन:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

दुर्भाग्य से:

<ब्लॉकक्वॉट>

चेतावनी विंडो:विंडो संचालन के लिए कोई विभाजन परिभाषित नहीं है! सभी डेटा को एक ही पार्टीशन में ले जाने से, यह गंभीर प्रदर्शन गिरावट का कारण बन सकता है।

इसलिए जब तक आपके पास अपने डेटा को विभाजित करने और विशिष्टता सुनिश्चित करने का एक स्वाभाविक तरीका नहीं है, इस समय विशेष रूप से उपयोगी नहीं है।



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Pgbackrest समय लक्ष्य बहाल करें

  2. PostgreSQL में वर्तमान समय (समय क्षेत्र के बिना) कैसे प्राप्त करें

  3. PostgreSQL + हाइबरनेट + स्प्रिंग ऑटो डेटाबेस बनाएँ

  4. Django AutoField प्रारंभ मान को संशोधित करें

  5. pgadmin4 :postgresql एप्लिकेशन सर्वर से संपर्क नहीं किया जा सका।