मैं एक ही मुद्दे पर था कि आपको कोई समाधान नहीं मिला या नहीं, लेकिन मैं निम्नलिखित करके कुछ ऐसा ही पूरा करने में सक्षम था। सबसे पहले, मैंने अपनी तालिका में ट्रिगर जोड़ा
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
जब भी कोई पंक्ति अद्यतन, हटाई या डाली जाती है, तो यह तालिका पर एक ट्रिगर सेट करेगा। फिर यह मेरे द्वारा सेट किए गए ट्रिगर फ़ंक्शन को कॉल करेगा जो कुछ इस तरह दिखता है:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
यह मुझे मेरे स्प्रिंग बूट प्रोजेक्ट से इनमें से किसी भी अपडेट को 'सुनने' की अनुमति देगा और यह पूरी पंक्ति को पेलोड के रूप में भेजेगा। इसके बाद, मेरे स्प्रिंग बूट प्रोजेक्ट में मैंने अपने डीबी से कनेक्शन कॉन्फ़िगर किया है।
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
इसके साथ मैं इसे अपने सर्विस क्लास में कंस्ट्रक्टर में ऑटोवायर (निर्भरता इंजेक्शन) देता हूं और इसे r2dbc PostgressqlConnection क्लास में इस तरह डाला:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
अब हम अपनी तालिका को 'सुनना' चाहते हैं और अपनी तालिका में कुछ अद्यतन करते समय सूचना प्राप्त करना चाहते हैं। ऐसा करने के लिए हम एक आरंभीकरण विधि सेट करते हैं जो निर्भरता इंजेक्शन के बाद @PostContruct एनोटेशन
का उपयोग करके की जाती है।@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
ध्यान दें कि हम pg_notify मेथड के अंदर जो भी नाम डालते हैं उसे सुनते हैं। इसके अलावा, हम कनेक्शन को बंद करने के लिए एक विधि स्थापित करना चाहते हैं जब बीन फेंकने वाला हो, जैसे:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
अब मैं बस एक विधि बनाता हूं जो वर्तमान में मेरी तालिका में जो कुछ भी है, उसका एक प्रवाह लौटाता है, और मैं इसे अपनी सूचनाओं के साथ भी मिला देता हूं, जैसा कि मैंने कहा था कि सूचनाएं एक जसन के रूप में आने से पहले, इसलिए मुझे इसे deserialize करना था और मैंने उपयोग करने का निर्णय लिया ऑब्जेक्टमैपर। तो, यह कुछ इस तरह दिखेगा:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
आशा है कि यह मदद करेगा। चीयर्स!