लुइगी में मौजूदा MySqlTarget कार्य पूरा होने पर इंगित करने के लिए एक अलग मार्कर तालिका का उपयोग करता है। मैं यहां एक मोटा तरीका अपनाऊंगा...लेकिन आपका प्रश्न बहुत सारगर्भित है, इसलिए यह वास्तविकता में अधिक जटिल होने की संभावना है।
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget
class TaskA(luigi.Task):
rundate = luigi.DateParameter(default=datetime.now().date())
target_table = "table_to_update"
host = "localhost:3306"
db = "db_to_use"
user = "user_to_use"
pw = "pw_to_use"
def get_target(self):
return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
update_id=str(self.rundate))
def requires(self):
return []
def output(self):
return self.get_target()
def run(self):
#update table
self.get_target().touch()
class TaskB(luigi.Task):
def requires(self):
return [TaskA()]
def run(self):
# reading from target_table