GSOC 2021 : Add subscriptions and email notifications

Personal information

Name: Vijaya Laxmi Durga Alekhya Nynala
IRC nick: Alekhya
GitHub: nynalaalekhya

Proposal

Project overview:

Building a notification service to send emails to the user when an entity has been edited. The entity can be one of the following, author, work, edition, edition group, publisher, and collection. Nodemailer library is used to send the emails to the desired party. The architecture of this system should support scaling. Mr_ monkey‘s suggestion(in this thread) to include a messaging queue like RabbitMQ would be very helpful to enable the system to handle numerous emails. I have used amqp library to manage RabbitMQ client and build channels to publish and consume messages.

Database changes

A table bookbrainz.subscription should be created that has a one to one relationship with entity, and user_collection tables.

CREATE TABLE bookbrainz.subscription (
 id UUID PRIMARY KEY DEFAULT public.uuid_generate_v4(),
 type bookbrainz.entity_type NOT NULL,
     editor_id INT NOT NULL,
     bbid UUID,
     collection_id UUID,
    created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT timezone('UTC'::TEXT, now())
);

ALTER TABLE bookbrainz.subscription ADD FOREIGN KEY (bbid) REFERENCES bookbrainz.entity (bbid);

ALTER TABLE bookbrainz.subscription ADD FOREIGN KEY (coolection_id) REFERENCES bookbrainz.user_collection (id) ON DELETE CASCADE;

ALTER TABLE bookbrainz.subscription ADD FOREIGN KEY (editor_id) REFERENCES bookbrainz.editor (id);

In the above table, the type is the entity type which is one of the 6 types, author, work, edition, edition group, publisher, and collection. This can be added to existing entity_type using,

CREATE TYPE bookbrainz.entity_type AS ENUM (
    'Collection',
	'Author',
	'EditionGroup',
	'Edition',
	'Publisher',
	'Work'
);

ORM Changes

One new model, Subscription is included in bookbrainz-data-js/src/models

export default function subscription(bookshelf) {	
const Subscription = bookshelf.Model.extend({
		format: camelToSnake,
		idAttribute: 'id',
		parse: snakeToCamel,
		entity() {
			return this.belongsTo('Entity', 'bbid');
		},
           collection() {
                	return this.belongsTo('UserCollection', 'collection_id');
		},
           subscriber() {
                	return this.belongsTo('Editor', 'user_id');
		},
		tableName: 'bookbrainz.subscription'
	});

return bookshelf.model('Subscription', Subscription);

API Endpoints

POST /subscription/create

POST /subscription/:subscriptionId/delete

GET /subscriptions

Workflow

A user can subscribe to an entity or a collection by clicking on the subscribe button available or by editing the entity.

After clicking or editing, POST /subscription/create endpoint is called and type and id are passed.

For example: If the user is subscribing to collection then type=’Collection’ and id =collectionId. If the user is subscribing to work then type=’Work’ and id=bbid.
The endpoint should use auth.isAuthenticated middleware. A new subscription with corresponding user_id, bbid or collection_id will be created.

const newSubscription = await new Subscription({
				userId: req.user.id
			});
			method = 'insert';
if(!isCollection)
newSubscription.set('bbid', req.body.id);
else
newSubscription.set('collection_id', req.body.id);
newSubscription.set('type', req.body.type);
await newSubscription.save(null, {method});

When a user edits a subscribed entity or a collection, the function mentioned below is called with corresponding bbid or collection_id to get all users who subscribed to the entity.

Get subscriptions with corresponding bbid or collection_id

const allSubscriptions = await new Subscription()
		.where((builder) => {
			if (isCollection) {
				builder.where('collection_id', '=', id);
			}else
                        {
                        	builder.where('bbid', '=', id);
                        }
		})
		.orderBy('created_at')
		.fetchAll({
			withRelated: 'subscriber'
		});

Then get users list from the above subscriptions

const allUsers = allSubscriptions.map((subscription)=>subscription.subscriber.id);

After getting the users, we need a method to retrieve the associated users’ registered email addresses. An object named email containing subject, to ( registered address retrieved from above) and body ( contains URL of entity or connection and user’s name who modified it) is created.

Refer to the following figure, understand that publisher publishes the email objects to the queue and the consumer gets the email object one at time to send the email using the sendEmail() function

Run rabbitmq container using docker

docker run -d --hostname my-rabbit -p 5672:5672 --name rabbitmq rabbitmq:3

Install amqplib
npm install amqplib

Establishing connection with rabbitmq

let amqpConnection;
async function connect() {
    try {
        const connection = await amqp.connect(process.env.CLOUDAMQP_URL);

        connection.on("error", err => {
            console.error("[AMQP] conn error", err.message);
            setTimeout(connect, 1000);
        });
        connection.on("close", () => {
            console.error("[AMQP] reconnecting");
            setTimeout(connect, 1000);
        });

        console.log("[AMQP] connected");
        amqpConnection = connection;

    } catch (err) {
        console.error("[AMQP]", err.message);
        setTimeout(connect, 1000);
    }
}

Setting up a publisher to send emails on a queue

let publisher;
async function startPublisher() {
    try {
        const channel = await amqpConnection.createConfirmChannel();
        publisher = channel;
        channel.on("error", err => {
            console.error("[AMQP] channel error", err.message);
        });
        channel.on("close", () => {
            console.log("[AMQP] channel closed");
        });


    } catch (err) {
        if (closeOnErr(err)) return;
        console.error("[AMQP] channel error", err.message);
    }
}


async function publish(exchange, routingKey, content) {
    try {
        const res = publisher.publish(exchange, routingKey, content, {
            persistent: true
        });

    } catch (e) {
        console.error("[AMQP] publish", e.message);
        publisher.connection.close();
    }
}
}

The publish function is called on every email in the array to send to jobs queue

publish(“ ", “jobs”, Buffer.from(JSON.stringify(data), 'utf-8'));

Creating consumer to fetch the emails on queue

The number of emails to be fetched can be set using channel.prefetch(number);

async function startWorker() {
    try {
        const consumer = await amqpConnection.createChannel();

        consumer.on("error", err => {
            console.error("[AMQP] channel error", err.message);
        });
        consumer.on("close", () => {
            console.log("[AMQP] channel closed");
        });

        consumer.prefetch(1);

        const res = await consumer.assertQueue("jobs", {
            durable: true
        });

        channel.consume("jobs", work, {
            noAck: false
        });
        
    } catch (err) {
        if (closeOnErr(err)) return;
    }

}

The work function extracts body, to and subject fields and calls sendEmail method in mailer server.

function work(msg) {
  try {
        let message = msg.content.toString();
        let email = JSON.parse(message);
        const body = email.body;
        const subject = email.subject;
        const to = email.to;

        await sendEmail(from, to, body, subject);

        await channel.ack(msg);
} catch(e) {

      closeOnErr(e);
}

sendEmail

The sendEmail method is defined as shown below

import nodemailer from 'nodemailer’;

import hbs from 'nodemailer-handlebars';

let transporter = nodemailer.createTransport({
    service: 'gmail',
    auth: {
        user: process.env.EMAIL || 'abc@gmail.com', // TODO: gmail account 
        pass: process.env.PASSWORD || '1234' // TODO: gmail password
    }
});

transporter.use('compile', hbs({
    viewEngine: 'express-handlebars',
    viewPath: './views/'
}));

export async function sendEmail( from, to, subject, body ) {

const mailOptions = {
    from: from, // TODO: email sender
    to: to, // TODO: email receiver
    subject: subject,
    template: 'index',
    context: {
        name: 'Hi, this is an email from Bookbrainz'
    } // send extra values to template
};

    await transporter.sendMail(mailOptions);

}

We use nodemailer handlebars is used to render html templates to send emails. From address should be configured.

closeOnErr
closeOnErr function is defined as following

function closeOnErr(err) {
    if (!err) return false;
    console.error("[AMQP] error", err);
    amqpConn.close();
    return true;
}

Retry queue

Whenever a message could not be published due to network errors then it could be lost. To resolve this issue a retry queue is added

let offlinePubQueue = [];

The message is added to queue when an error occurs, this is included in the publish function

async function publish(exchange, routingKey, content) {
    try {
        const res = publisher.publish(exchange, routingKey, content, {
            persistent: true
        });

    } catch (e) {
        console.error("[AMQP] publish", e.message);
        offlinePubQueue.push([exchange, routingKey, content]);
        publisher.connection.close();
    }
}

Loop through all the emails in offlinePubQueue and publish then when user is reconnected and publisher is started (in publisher code)

while (true) {
            const element = offlinePubQueue.shift();
            if (!element) break;
            publish(element[0], element[1], element[2]);
        }

My Subscriptions

Endpoint
/subscriptions?from=from&size=size

Get all subscriptions with corresponding logged in user id

 const mySubscriptions = await new Subscription()
    		.where((builder) => {
    			builder.where('user_id', '=', req.user.id);
    		})
    		.orderBy('created_at')
    		.fetchPage({
    			limit: parseInt(req.query.size),
    			offset: parseInt(req.query.from)
    		});

My Subscriptions UI

Generating props and passing them to subscription page component

const props = generateProps(req, res, {
    			mySubscriptions,
    			from,
    			size,
    			userId,
                    tableHeading: 'My Subscriptions'
    		});

const markup = ReactDOMServer.renderToString(
			<Layout {...propHelpers.extractLayoutProps(props)}>
				<SubscriptionPage
					{...propHelpers.extractChildProps(props)}
				/>
			</Layout>
		);

const script = '/js/subscription.js';

return res.send(target({
			markup,
			props: escapeProps(props),
			script
		}));
	}

Delete subscription

Endpoint
/subscription/:subscriptionId/delete

Delete the subscription with corresponding id (unsubscribe)

await new Subscription({id: subscriptionId}).destroy();

Timeline

Community Bonding Period

Taking advice from mentor about the subscription page design and the template and content of the email notifications. Getting more familiar with the codebase and discuss the project schedule.

Week 1-2 :

Creating subscription table and subscription type enum. Implementing the associated Subscription model

Week 3-4:

Developing the subscription/create endpoint, subscription/:susbscriptionId/delete, /subscriptions and making necessary changes to backend to include logic.

Week 5,6,7:

Setting up RabbitMQ. Writing the publisher and consumer code. Including the nodemailer and handlebars dependencies and setup mailer service. And writing unit tests

Week 8 :

Designing and developing my subscriptions page and adding unsubscribe functionality to the user interface.

Week 9 :

Add retry queue functionality to the project and writing unit tests using mocha and chai libraries

Week 10 :

Change docker files to run rabbitmq, update the corresponding documentation, and include endpoints in swagger file.

Extended Goal

  1. Adding notification alerts to the website

  1. Adding preferences to enable/disable email notifications and website notifications(as discussed above) to my subscriptions page. An additional endpoint /subscription/:subscriptionId/edit has to be created.
  2. Functionality to enable/disable auto-subscribing to an entity by associating every user with a preference and checking it against while the time of subscription creation.

Detailed information about yourself

I am a graduate student at San Jose State University pursuing a master’s degree in Computer Engineering.
Here is the list of PRs I have opened in the bookbrainz repo.

  • Tell us about the computer(s) you have available for working on your SoC project!
    I will be using Lenovo Ideapad S340 (i7)

  • When did you first start programming?
    While I was pursuing my bachelor’s degree, I have started to code in C programming language in the first semester. Later I learned Javascript to participate in hackathons.

  • What type of music do you listen to? (Please list a series of MBIDs as examples.)
    I love listening to One Direction music albums. My favorite being Drag me down

    • If applying for a BookBrainz project: what type of books do you read? (Please list a series of BBIDs as examples. (And feel free to also list music you listen to!))
      I enjoy reading works of fiction and fantasy. I read the Harry Potter book series.
  • What aspects of the project you’re applying for (e.g., MusicBrainz, AcousticBrainz, etc.) interest you the most?
    BookBrainz has a seamless interface to find the desired works and it is easy to retrieve the corresponding details and ISBN or associated identifiers.

  • Have you contributed to other Open Source projects? If so, which projects and can we see some of your code?
    Yes, I have contributed to Internet Archive Openlibrary and worked on increasing WAVE accessibility score, moving inline JS, intersection observer fix and barcode reader functionality.

  • What sorts of programming projects have you done on your own time?
    Amazon Clone: ReactJS, Redux, Material UI, Stripe.js, Node.js, MongoDB, Express, Mongoose, Firebase
    Spotify based audio player: ReactJS, React Context API, Storybook, AWS S3, Firebase
    OTP based ticketing system: EJS, Node.js, Express, PostgreSQL, Passport.js, Twilio, Stripe.js, Postman, Heroku

  • How much time do you have available, and how would you plan to use it?
    I’ll put my efforts to complete the project throughout this summer. My courses will begin at end of August. So, I would be able to devote 35-40 hours a week to the project.

  • Do you plan to have a job or study during the summer in conjunction with Summer of Code?
    No. I plan to dedicate my time and solely work on this project.

Hi @mr_monkey,
This is my GSOC project proposal. Please review the document and suggest any changes. I will be glad to hear your feedback.

Thanks for you proposal @alexman123

Here are some suggestions and questions :

"A table bookbrainz.subscription […] has a one to one relationship with the editor […]”

I’m not sure that’s what you meant. one-to-one relationship between the subscription and editor would mean that there is only one subscription per editor, which I assume wouldn’t be the case.

Less importantly, user_id should be editor_id as we have in the rest of the schema.

For the subscription table, the SQL does not match the schema diagram you have (user_id/editor_id is missing)
I can however already point out that I wouldn’t expect there to be a separate collection_id and bbid columns. They are both UUIDs, so I don’t see the reason to have them separate. What did you have in mind?

The subscription_type looks a lot like the existing entity_type enum. I wonder if we could just add “Collection” to this existing enum.

When a user edits a subscribed entity or a collection, a function is called with corresponding bbid or collection_id.

This is a bit too vague. Where does that function live, and what does it do?

Not sure why there is both Promise and async await/syntax in the email sending mechanism.
It would be good to only use async/await for clarity. For example I think sendEmail(from, to, body, subject); resolve(true);is not the correct way to handle Promises. You would probably have to return sendEmail(… and follow it with then/catch. Let’s standardize on async/await which is simpler to read and maybe less error-prone.
I’d want to see some form of error handling around that too.

I like the retry queue, that’s good redundancy to have in mind!
How would it fit in with the RMQ flow, meaning when would the offlinePubQueue be processed?
It looks to me like the first thing you do in that process is remove the item from the queue (offlinePubQueue.shift()); if publish fails again that message would be lost.

Get all subscriptions with corresponding user id

There’s no user id in /subscriptions?from=from&size=size. Did you mean /user/$userID/subscriptions?from=from&size=size ? Or is the user id based on the logged in user?

50 hours a week looks like a lot, and probably not attainable (and if attainable, not good for you :slight_smile: ) Let’s cap that at 35-40h, which I think is more realistic.

Thank you @mr_monkey. I have addressed the issues, could you please review them.

Yeah, it seems to be missing. Added it to the subscription table

That would make it simpler! I included ‘Collection’ type to entity_type enum

This is meant to be the following helper function to get subscriptions with associated id and then retrieving the users who subscribed.

Modified it to async await and used try and catch for errors

offlinePubQueue would be processed when the publisher starts as shown in code below

async function startPublisher() {
    try {
        const channel = await amqpConnection.createConfirmChannel();
        publisher = channel;
        channel.on("error", err => {
            console.error("[AMQP] channel error", err.message);
        });
        channel.on("close", () => {
            console.log("[AMQP] channel closed");
        });


        while (true) {
            const element = offlinePubQueue.shift();
            if (!element) break;
            publish(element[0], element[1], element[2]);
        }

    } catch (err) {
        if (closeOnErr(err)) return;
        console.error("[AMQP] channel error", err.message);
    }
}

If the publisher fails again, the message would be added to the queue since the publish method handles it

async function publish(exchange, routingKey, content) {
    try {
        const res = publisher.publish(exchange, routingKey, content, {
            persistent: true
        });

    } catch (e) {
        console.error("[AMQP] publish", e.message);
        offlinePubQueue.push([exchange, routingKey, content]);
        publisher.connection.close();
    }
}

As mentioned in the code, user_id = req.user.id
It is based on logged in user id

Changed this content and user_id to editor_id

It would be easier to reference the corresponding tables: entity and collection

Thats right :slight_smile: Changed it to 35-40 hours.