स्कैला :
यदि आपको केवल विशिष्ट संख्याओं की आवश्यकता है तो आप zipWithUniqueId
. का उपयोग कर सकते हैं और डेटाफ़्रेम को फिर से बनाएँ। पहले कुछ आयात और डमी डेटा:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
आगे उपयोग के लिए स्कीमा निकालें:
val schema = df.schema
आईडी फ़ील्ड जोड़ें:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
डेटाफ़्रेम बनाएँ:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
पायथन . में वही बात :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
यदि आप लगातार संख्या पसंद करते हैं तो आप zipWithUniqueId
. को बदल सकते हैं zipWithIndex
. के साथ लेकिन यह थोड़ा अधिक महंगा है।
सीधे DataFrame
के साथ एपीआई :
(सार्वभौमिक स्कैला, पायथन, जावा, आर काफी हद तक समान वाक्यविन्यास के साथ)
पहले मैं monotonicallyIncreasingId
. से चूक गया था फ़ंक्शन जो तब तक ठीक काम करना चाहिए जब तक आपको लगातार संख्याओं की आवश्यकता न हो:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
जबकि उपयोगी monotonicallyIncreasingId
गैर नियतात्मक है। न केवल आईडी निष्पादन से निष्पादन में भिन्न हो सकते हैं बल्कि अतिरिक्त ट्रिक्स के बिना पंक्तियों की पहचान करने के लिए उपयोग नहीं किया जा सकता है जब बाद के संचालन में फ़िल्टर होते हैं।
नोट :
rowNumber
. का उपयोग करना भी संभव है विंडो फ़ंक्शन:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
दुर्भाग्य से:
<ब्लॉकक्वॉट>चेतावनी विंडो:विंडो संचालन के लिए कोई विभाजन परिभाषित नहीं है! सभी डेटा को एक ही पार्टीशन में ले जाने से, यह गंभीर प्रदर्शन गिरावट का कारण बन सकता है।
इसलिए जब तक आपके पास अपने डेटा को विभाजित करने और विशिष्टता सुनिश्चित करने का एक स्वाभाविक तरीका नहीं है, इस समय विशेष रूप से उपयोगी नहीं है।