HBase
 sql >> डेटाबेस >  >> NoSQL >> HBase

अपाचे काफ्का में मजबूत संदेश क्रमांकन अपाचे एवरो का उपयोग, भाग 1

अपाचे काफ्का में, जावा एप्लिकेशन जिन्हें प्रोड्यूसर कहा जाता है, काफ्का क्लस्टर (दलालों से बना) को संरचित संदेश लिखते हैं। इसी तरह, जावा एप्लिकेशन जिन्हें उपभोक्ता कहा जाता है, इन संदेशों को एक ही क्लस्टर से पढ़ते हैं। कुछ संगठनों में, उत्पादकों और उपभोक्ताओं को लिखने और प्रबंधित करने के लिए अलग-अलग समूह होते हैं। ऐसे मामलों में, उत्पादकों और उपभोक्ताओं के बीच सहमत संदेश प्रारूप के समन्वय में एक प्रमुख दर्द बिंदु हो सकता है।

यह उदाहरण दर्शाता है कि कैसे अपाचे काफ्का को बनाए गए रिकॉर्ड को क्रमबद्ध करने के लिए अपाचे एवरो का उपयोग करें, जबकि स्कीमा के विकास और निर्माता और उपभोक्ता अनुप्रयोगों के गैर-समकालिक अद्यतन की अनुमति दें।

क्रमांकन और अक्रमांकन

एक काफ्का रिकॉर्ड (जिसे पहले संदेश कहा जाता था) में एक कुंजी, एक मान और शीर्षलेख होते हैं। काफ्का रिकॉर्ड की कुंजी और मूल्य में डेटा की संरचना से अवगत नहीं है। यह उन्हें बाइट सरणियों के रूप में संभालता है। लेकिन सिस्टम जो काफ्का से रिकॉर्ड पढ़ते हैं, उन रिकॉर्ड्स में डेटा की परवाह करते हैं। तो आपको एक पठनीय प्रारूप में डेटा तैयार करने की आवश्यकता है। आपके द्वारा उपयोग किया जाने वाला डेटा प्रारूप

. होना चाहिए
  • संक्षिप्त रहें
  • एनकोड और डिकोड करने के लिए तेज़ रहें
  • विकास की अनुमति दें
  • अपस्ट्रीम सिस्टम (जो काफ्का क्लस्टर को लिखते हैं) और डाउनस्ट्रीम सिस्टम (जो एक ही काफ्का क्लस्टर से पढ़ते हैं) को अलग-अलग समय पर नए स्कीमा में अपग्रेड करने की अनुमति दें

उदाहरण के लिए, जेएसओएन स्वयं व्याख्यात्मक है लेकिन एक कॉम्पैक्ट डेटा प्रारूप नहीं है और पार्स करने में धीमा है। एवरो एक तेज़ क्रमांकन ढांचा है जो अपेक्षाकृत कॉम्पैक्ट आउटपुट बनाता है। लेकिन एवरो रिकॉर्ड्स को पढ़ने के लिए, आपको उस स्कीमा की आवश्यकता होती है जिसके साथ डेटा को क्रमबद्ध किया गया था।

एक विकल्प स्कीमा को रिकॉर्ड के साथ ही स्टोर और ट्रांसफर करना है। यह उस फ़ाइल में ठीक है जहाँ आप स्कीमा को एक बार संग्रहीत करते हैं और इसे अधिक संख्या में रिकॉर्ड के लिए उपयोग करते हैं। हालाँकि, प्रत्येक काफ्का रिकॉर्ड में स्कीमा को संग्रहीत करना, भंडारण स्थान और नेटवर्क उपयोग के मामले में महत्वपूर्ण ओवरहेड जोड़ता है। एक अन्य विकल्प पहचानकर्ता-स्कीमा मैपिंग का एक सहमत-सेट सेट करना है और रिकॉर्ड में उनके पहचानकर्ताओं द्वारा स्कीमा को संदर्भित करना है।

ऑब्जेक्ट से काफ्का रिकॉर्ड और बैक तक

निर्माता अनुप्रयोगों को डेटा को सीधे बाइट सरणियों में बदलने की आवश्यकता नहीं है। KafkaProducer एक सामान्य वर्ग है जिसे अपने उपयोगकर्ता को कुंजी और मूल्य प्रकार निर्दिष्ट करने की आवश्यकता होती है। फिर, निर्माता ProducerRecord . के उदाहरणों को स्वीकार करते हैं जिनके पास समान प्रकार के पैरामीटर हैं। वस्तु से बाइट सरणी में रूपांतरण एक सीरियलाइज़र द्वारा किया जाता है। काफ्का कुछ आदिम धारावाहिक प्रदान करता है:उदाहरण के लिए, IntegerSerializer , ByteArraySerializer , StringSerializer . उपभोक्ता पक्ष पर, समान Deserializers बाइट सरणी को उस ऑब्जेक्ट में परिवर्तित करते हैं जिससे एप्लिकेशन निपट सकता है।

इसलिए सीरियलाइज़र और डिसेरिएलाइज़र स्तर पर हुक करना और निर्माता और उपभोक्ता अनुप्रयोगों के डेवलपर्स को काफ्का द्वारा प्रदान किए गए सुविधाजनक इंटरफ़ेस का उपयोग करने की अनुमति देना समझ में आता है। हालांकि काफ्का के नवीनतम संस्करण ExtendedSerializers . की अनुमति देते हैं और ExtendedDeserializers हेडर तक पहुंचने के लिए, हमने रिकॉर्ड हेडर जोड़ने के बजाय काफ्का रिकॉर्ड की कुंजी और मूल्य में स्कीमा पहचानकर्ता को शामिल करने का निर्णय लिया।

एवरो एसेंशियल्स

एवरो एक डेटा क्रमांकन (और दूरस्थ प्रक्रिया कॉल) ढांचा है। यह डेटा संरचनाओं का वर्णन करने के लिए स्कीमा नामक JSON दस्तावेज़ का उपयोग करता है। अधिकांश एवरो उपयोग GenericRecord या SpecificRecord के उपवर्गों के माध्यम से होता है। एवरो स्कीमा से उत्पन्न जावा वर्ग बाद के उपवर्ग हैं, जबकि पूर्व का उपयोग डेटा संरचना के पूर्व ज्ञान के बिना किया जा सकता है जिसके साथ काम किया गया है।

जब दो स्कीमा संगतता नियमों के एक सेट को संतुष्ट करते हैं, तो एक स्कीमा (लेखक स्कीमा कहा जाता है) के साथ लिखे गए डेटा को पढ़ा जा सकता है जैसे कि यह दूसरे के साथ लिखा गया था (रीडर स्कीमा कहा जाता है)। स्कीमा का एक विहित रूप होता है जिसमें सभी विवरण होते हैं जो क्रमांकन के लिए अप्रासंगिक होते हैं, जैसे टिप्पणियां, तुल्यता जांच में सहायता के लिए हटा दी जाती हैं।

वर्जनेडस्कीमा और स्कीमाप्रोवाइडर

जैसा कि पहले उल्लेख किया गया है, हमें स्कीमा और उनके पहचानकर्ताओं के बीच एक-से-एक मानचित्रण की आवश्यकता है। कभी-कभी स्कीमा को नामों से संदर्भित करना आसान होता है। जब एक संगत स्कीमा बनाया जाता है तो इसे स्कीमा का अगला संस्करण माना जा सकता है। इस प्रकार हम स्कीमा को एक नाम, संस्करण जोड़ी के साथ संदर्भित कर सकते हैं। आइए स्कीमा, उसके पहचानकर्ता, नाम और संस्करण को एक साथ VersionedSchema कहते हैं . इस ऑब्जेक्ट में एप्लिकेशन के लिए आवश्यक अतिरिक्त मेटाडेटा हो सकता है।

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider ऑब्जेक्ट VersionedSchema . के उदाहरण देख सकते हैं ।

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

इस इंटरफ़ेस को कैसे लागू किया जाता है, यह भविष्य के ब्लॉग पोस्ट में "एक स्कीमा स्टोर को लागू करना" में शामिल किया गया है।

जेनेरिक डेटा को क्रमानुसार बनाना

रिकॉर्ड को क्रमबद्ध करते समय, हमें सबसे पहले यह पता लगाना होगा कि किस स्कीमा का उपयोग करना है। प्रत्येक रिकॉर्ड में एक getSchema होता है तरीका। लेकिन स्कीमा से पहचानकर्ता का पता लगाना समय लेने वाला हो सकता है। प्रारंभिक समय पर स्कीमा सेट करना आम तौर पर अधिक कुशल होता है। यह सीधे पहचानकर्ता द्वारा या नाम और संस्करण द्वारा किया जा सकता है। इसके अलावा, कई विषयों के लिए उत्पादन करते समय, हम अलग-अलग विषयों के लिए अलग-अलग स्कीमा सेट करना चाहते हैं और विधि serialize(T, String) के पैरामीटर के रूप में दिए गए विषय नाम से स्कीमा का पता लगा सकते हैं। . संक्षिप्तता और सरलता के लिए यह तर्क हमारे उदाहरणों में छोड़ दिया गया है।

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

हाथ में स्कीमा के साथ, हमें इसे अपने संदेश में संग्रहीत करने की आवश्यकता है। संदेश के हिस्से के रूप में आईडी को क्रमबद्ध करने से हमें एक कॉम्पैक्ट समाधान मिलता है, क्योंकि सारा जादू सीरिएलाइज़र/डिसेरिएलाइज़र में होता है। यह अन्य ढांचे और पुस्तकालयों के साथ बहुत आसान एकीकरण को भी सक्षम बनाता है जो पहले से ही काफ्का का समर्थन करते हैं और उपयोगकर्ता को अपने स्वयं के धारावाहिक (जैसे स्पार्क) का उपयोग करने देता है।

इस दृष्टिकोण का उपयोग करते हुए, हम पहले चार बाइट्स पर पहले स्कीमा पहचानकर्ता लिखते हैं।

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

फिर हम एक DatumWriter बना सकते हैं और वस्तु को क्रमबद्ध करें।

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

इन सबको मिलाकर, हमने एक सामान्य डेटा सीरिएलाइज़र लागू किया है।

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

जेनेरिक डेटा को डिसेरिएलाइज़ करना

अक्रमांकन एकल स्कीमा के साथ काम कर सकता है (स्कीमा डेटा के साथ लिखा गया था) लेकिन आप एक अलग पाठक स्कीमा निर्दिष्ट कर सकते हैं। पाठक स्कीमा को उस स्कीमा के साथ संगत होना चाहिए जिसके साथ डेटा को क्रमबद्ध किया गया था, लेकिन इसके समकक्ष होने की आवश्यकता नहीं है। इस कारण से, हमने स्कीमा नाम पेश किए। अब हम निर्दिष्ट कर सकते हैं कि हम स्कीमा के विशिष्ट संस्करण के साथ डेटा पढ़ना चाहते हैं। आरंभीकरण के समय हम प्रति स्कीमा नाम के वांछित स्कीमा संस्करण पढ़ते हैं और मेटाडेटा को readerSchemasByName में संग्रहीत करते हैं त्वरित पहुँच के लिए। अब हम स्कीमा के संगत संस्करण के साथ लिखे गए प्रत्येक रिकॉर्ड को पढ़ सकते हैं जैसे कि यह निर्दिष्ट संस्करण के साथ लिखा गया हो।

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

जब किसी रिकॉर्ड को डीसेरियलाइज़ करने की आवश्यकता होती है, तो हम पहले लेखक स्कीमा के पहचानकर्ता को पढ़ते हैं। यह पाठक स्कीमा को नाम से देखने में सक्षम बनाता है। दोनों स्कीमा उपलब्ध होने से हम एक GeneralDatumReader . बना सकते हैं और रिकॉर्ड पढ़ें।

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

स्पेसिफिक रिकॉर्ड्स से निपटना

अक्सर एक वर्ग होता है जिसे हम अपने रिकॉर्ड के लिए उपयोग करना चाहते हैं। यह वर्ग तब आमतौर पर एवरो स्कीमा से उत्पन्न होता है। अपाचे एवरो स्कीमा से जावा कोड उत्पन्न करने के लिए उपकरण प्रदान करता है। ऐसा ही एक टूल है एवरो मावेन प्लगइन। जेनरेट की गई कक्षाओं में स्कीमा है जो वे रनटाइम पर उपलब्ध से उत्पन्न हुए थे। यह क्रमांकन और अक्रमांकन को सरल और अधिक प्रभावी बनाता है। क्रमांकन के लिए हम उपयोग करने के लिए स्कीमा पहचानकर्ता के बारे में पता लगाने के लिए कक्षा का उपयोग कर सकते हैं।

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

इस प्रकार हमें विषय और डेटा से स्कीमा निर्धारित करने के लिए तर्क की आवश्यकता नहीं है। हम रिकॉर्ड लिखने के लिए रिकॉर्ड क्लास में उपलब्ध स्कीमा का उपयोग करते हैं।

इसी तरह, अक्रमांकन के लिए, पाठक स्कीमा को कक्षा से ही पता लगाया जा सकता है। अक्रमांकन तर्क सरल हो जाता है, क्योंकि पाठक स्कीमा विन्यास समय पर तय होता है और इसे स्कीमा नाम से देखने की आवश्यकता नहीं होती है।

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

अतिरिक्त पठन

स्कीमा संगतता के बारे में अधिक जानकारी के लिए, स्कीमा रिज़ॉल्यूशन के लिए एवरो विनिर्देश देखें।

विहित रूपों के बारे में अधिक जानकारी के लिए, स्कीमा के लिए कैननिकल फॉर्म को पार्स करने के लिए एवरो विनिर्देश देखें।

अगली बार...

भाग 2 एवरो स्कीमा परिभाषाओं को संग्रहीत करने के लिए एक प्रणाली के कार्यान्वयन को दिखाएगा।


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. CDP प्राइवेट क्लाउड बेस 7 बनाम CDH5 में ऑपरेशनल डेटाबेस परफॉर्मेंस इम्प्रूवमेंट

  2. Apache Hadoop आर्किटेक्चर - HDFS, YARN और MapReduce

  3. Hadoop उच्च उपलब्धता सुविधा को समझना

  4. हैशटेबल/सिंकटेबल टूल के साथ HBase क्लस्टर डेटा सिंक्रोनाइज़ेशन

  5. अपाचे HBase प्रतिकृति अवलोकन