समस्या हल हो गई ! मुझे विश्वास नहीं हो रहा है कि मैंने इस पर पूरे दो दिन बिताए हैं... मैं पूरी तरह से गलत दिशा में देख रहा था।
समस्या कुछ डेटाफ़्लो या GCP नेटवर्किंग कॉन्फ़िगरेशन के साथ नहीं थी, और जहाँ तक मैं बता सकता हूँ...
सच है।
समस्या निश्चित रूप से मेरे कोड में थी:केवल वितरित वातावरण में ही समस्या का पता चला था। मैंने श्रमिकों के बजाय मुख्य पाइपलाइन प्रोसेसर से सुरंग खोलने की गलती की थी। तो SSH सुरंग ऊपर थी लेकिन श्रमिकों और लक्ष्य सर्वर के बीच नहीं, केवल मुख्य पाइपलाइन और लक्ष्य के बीच!
इसे ठीक करने के लिए, मुझे सुरंग के साथ क्वेरी निष्पादन को लपेटने के लिए अपना अनुरोध करने वाला DoFn बदलना पड़ा:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
जैसा कि आप देख सकते हैं, मुझे pysql_beam लाइब्रेरी के कुछ बिट्स को ओवरराइड करना पड़ा।
अंत में, प्रत्येक कार्यकर्ता प्रत्येक अनुरोध के लिए अपनी सुरंग खोलता है। शायद इस व्यवहार को अनुकूलित करना संभव है लेकिन यह मेरी ज़रूरतों के लिए पर्याप्त है।